You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/08/25 21:44:01 UTC
[systemds] branch master updated: [SYSTEMDS-3102] Performance
in-memory reblocks for binary inputs
This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new 37a15d4 [SYSTEMDS-3102] Performance in-memory reblocks for binary inputs
37a15d4 is described below
commit 37a15d4620f669724d3927a3b7d71cbc1a6a1a18
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Wed Aug 25 23:43:24 2021 +0200
[SYSTEMDS-3102] Performance in-memory reblocks for binary inputs
This patch makes two performance improvements to in-memory reblocks
inside sp_rblk by preferring in-memory reblock for binary inputs (where
the read is much faster than distributed reblocking), and leveraging
similar to rand the lineage items to avoid cache pollution and
unnecessary evictions.
---
.../org/apache/sysds/hops/recompile/Recompiler.java | 13 ++++++++++---
.../instructions/spark/CSVReblockSPInstruction.java | 3 ++-
.../instructions/spark/ReblockSPInstruction.java | 20 ++++++++++++++++++--
.../sysds/runtime/lineage/LineageRecomputeUtils.java | 17 +++++++++++++++++
4 files changed, 47 insertions(+), 6 deletions(-)
diff --git a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
index 8bb842e..d644bf0 100644
--- a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
+++ b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
@@ -93,6 +93,7 @@ import org.apache.sysds.runtime.instructions.cp.IntObject;
import org.apache.sysds.runtime.instructions.cp.ListObject;
import org.apache.sysds.runtime.instructions.cp.ScalarObject;
import org.apache.sysds.runtime.io.IOUtilFunctions;
+import org.apache.sysds.runtime.lineage.LineageItem;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
@@ -1578,7 +1579,7 @@ public class Recompiler {
throw new DMLRuntimeException(ex);
}
}
-
+
//check valid dimensions and memory requirements
double sp = OptimizerUtils.getSparsity(rows, cols, nnz);
double mem = MatrixBlock.estimateSizeInMemory(rows, cols, sp);
@@ -1593,7 +1594,8 @@ public class Recompiler {
long estFilesize = (long)(3.5 * mem); //conservative estimate
long cpThreshold = CP_REBLOCK_THRESHOLD_SIZE *
OptimizerUtils.getParallelTextReadParallelism();
- return (estFilesize < cpThreshold);
+ return (iimd.getFileFormat() == FileFormat.BINARY
+ || estFilesize < cpThreshold); //for text conservative
}
public static boolean checkCPCheckpoint(DataCharacteristics dc) {
@@ -1601,9 +1603,13 @@ public class Recompiler {
&& OptimizerUtils.isValidCPDimensions(dc.getRows(), dc.getCols())
&& !OptimizerUtils.exceedsCachingThreshold(dc.getCols(), OptimizerUtils.estimateSize(dc));
}
+
+ public static void executeInMemoryReblock(ExecutionContext ec, String varin, String varout) {
+ executeInMemoryReblock(ec, varin, varout, null);
+ }
@SuppressWarnings("unchecked")
- public static void executeInMemoryReblock(ExecutionContext ec, String varin, String varout) {
+ public static void executeInMemoryReblock(ExecutionContext ec, String varin, String varout, LineageItem litem) {
CacheableData<CacheBlock> in = (CacheableData<CacheBlock>) ec.getCacheableData(varin);
CacheableData<CacheBlock> out = (CacheableData<CacheBlock>) ec.getCacheableData(varout);
@@ -1618,6 +1624,7 @@ public class Recompiler {
//set output (incl update matrix characteristics)
out.acquireModify(mb);
+ out.setCacheLineage(litem);
out.release();
in.release();
}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java
index f0c81f4..cae0fbe 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java
@@ -119,8 +119,9 @@ public class CSVReblockSPInstruction extends UnarySPInstruction {
//check for in-memory reblock (w/ lazy spark context, potential for latency reduction)
if( Recompiler.checkCPReblock(sec, input1.getName()) ) {
- if( input1.getDataType().isMatrix() || input1.getDataType().isFrame() )
+ if( input1.getDataType().isMatrix() || input1.getDataType().isFrame() ) {
Recompiler.executeInMemoryReblock(sec, input1.getName(), output.getName());
+ }
Statistics.decrementNoOfExecutedSPInst();
return;
}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
index 81b50b8..b8192a9 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
@@ -21,6 +21,7 @@ package org.apache.sysds.runtime.instructions.spark;
import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.api.java.JavaPairRDD;
@@ -41,6 +42,7 @@ import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
import org.apache.sysds.runtime.io.FileFormatPropertiesLIBSVM;
import org.apache.sysds.runtime.io.FileFormatPropertiesMM;
import org.apache.sysds.runtime.io.IOUtilFunctions;
+import org.apache.sysds.runtime.lineage.LineageItem;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixCell;
@@ -48,6 +50,7 @@ import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.matrix.operators.Operator;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.meta.MetaDataFormat;
+import org.apache.sysds.runtime.util.ProgramConverter;
import org.apache.sysds.utils.Statistics;
public class ReblockSPInstruction extends UnarySPInstruction {
@@ -96,8 +99,10 @@ public class ReblockSPInstruction extends UnarySPInstruction {
//check for in-memory reblock (w/ lazy spark context, potential for latency reduction)
if( Recompiler.checkCPReblock(sec, input1.getName()) ) {
- if( input1.getDataType().isMatrix() || input1.getDataType().isFrame() )
- Recompiler.executeInMemoryReblock(sec, input1.getName(), output.getName());
+ if( input1.getDataType().isMatrix() || input1.getDataType().isFrame() ) {
+ Recompiler.executeInMemoryReblock(sec, input1.getName(), output.getName(),
+ iimd.getFileFormat()==FileFormat.BINARY ? getLineageItem(ec).getValue() : null);
+ }
Statistics.decrementNoOfExecutedSPInst();
return;
}
@@ -256,4 +261,15 @@ public class ReblockSPInstruction extends UnarySPInstruction {
+ "for ReblockSPInstruction: " + fmt.toString());
}
}
+
+ @Override
+ public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
+ //construct reblock lineage without existing createvar lineage
+ if( ec.getLineage() == null ) {
+ return Pair.of(output.getName(), new LineageItem(
+ ProgramConverter.serializeDataObject(input1.getName(), ec.getCacheableData(input1)), "cache_rblk"));
+ }
+ //default reblock w/ active lineage tracing
+ return super.getLineageItem(ec);
+ }
}
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java
index fe31b38..c51b1c1 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.DataType;
+import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.common.Types.OpOp1;
import org.apache.sysds.common.Types.OpOp2;
import org.apache.sysds.common.Types.OpOp3;
@@ -58,6 +59,7 @@ import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.BasicProgramBlock;
import org.apache.sysds.runtime.controlprogram.FunctionProgramBlock;
import org.apache.sysds.runtime.controlprogram.Program;
+import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContextFactory;
import org.apache.sysds.runtime.instructions.Instruction;
@@ -71,6 +73,7 @@ import org.apache.sysds.runtime.instructions.cp.ScalarObjectFactory;
import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
import org.apache.sysds.runtime.instructions.spark.RandSPInstruction;
import org.apache.sysds.runtime.instructions.spark.SPInstruction.SPType;
+import org.apache.sysds.runtime.util.ProgramConverter;
import org.apache.sysds.utils.Explain;
import org.apache.sysds.utils.Explain.ExplainCounts;
import org.apache.sysds.utils.Statistics;
@@ -200,6 +203,20 @@ public class LineageRecomputeUtils {
operands.put(item.getId(), input); // order preserving
break;
}
+ else if( item.getOpcode().equals("cache_rblk") ) {
+ CacheableData<?> dat = (CacheableData<?>)ProgramConverter.parseDataObject(item.getData())[1];
+ DataOp hop = new DataOp("tmp", dat.getDataType(), dat.getValueType(),
+ OpOpData.PERSISTENTREAD, dat.getFileName(), dat.getNumRows(),
+ dat.getNumColumns(), dat.getDataCharacteristics().getNonZeros(), -1);
+ hop.setFileFormat(FileFormat.BINARY);
+ hop.setInputBlocksize(dat.getBlocksize());
+ hop.setBlocksize(ConfigurationManager.getBlocksize());
+ hop.setRequiresReblock(true);
+ operands.put(item.getId(), hop);
+ break;
+ }
+
+
Instruction inst = InstructionParser.parseSingleInstruction(item.getData());
if (inst instanceof DataGenCPInstruction) {