You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/08/25 21:44:01 UTC

[systemds] branch master updated: [SYSTEMDS-3102] Performance in-memory reblocks for binary inputs

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 37a15d4  [SYSTEMDS-3102] Performance in-memory reblocks for binary inputs
37a15d4 is described below

commit 37a15d4620f669724d3927a3b7d71cbc1a6a1a18
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Wed Aug 25 23:43:24 2021 +0200

    [SYSTEMDS-3102] Performance in-memory reblocks for binary inputs
    
    This patch makes two performance improvements to in-memory reblocks
    inside sp_rblk by preferring in-memory reblock for binary inputs (where
    the read is much faster than distributed reblocking), and leveraging
    similar to rand the lineage items to avoid cache pollution and
    unnecessary evictions.
---
 .../org/apache/sysds/hops/recompile/Recompiler.java  | 13 ++++++++++---
 .../instructions/spark/CSVReblockSPInstruction.java  |  3 ++-
 .../instructions/spark/ReblockSPInstruction.java     | 20 ++++++++++++++++++--
 .../sysds/runtime/lineage/LineageRecomputeUtils.java | 17 +++++++++++++++++
 4 files changed, 47 insertions(+), 6 deletions(-)

diff --git a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
index 8bb842e..d644bf0 100644
--- a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
+++ b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
@@ -93,6 +93,7 @@ import org.apache.sysds.runtime.instructions.cp.IntObject;
 import org.apache.sysds.runtime.instructions.cp.ListObject;
 import org.apache.sysds.runtime.instructions.cp.ScalarObject;
 import org.apache.sysds.runtime.io.IOUtilFunctions;
+import org.apache.sysds.runtime.lineage.LineageItem;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
@@ -1578,7 +1579,7 @@ public class Recompiler {
 				throw new DMLRuntimeException(ex);
 			}
 		}
-		
+
 		//check valid dimensions and memory requirements
 		double sp = OptimizerUtils.getSparsity(rows, cols, nnz);
 		double mem = MatrixBlock.estimateSizeInMemory(rows, cols, sp);
@@ -1593,7 +1594,8 @@ public class Recompiler {
 		long estFilesize = (long)(3.5 * mem); //conservative estimate
 		long cpThreshold = CP_REBLOCK_THRESHOLD_SIZE * 
 			OptimizerUtils.getParallelTextReadParallelism();
-		return (estFilesize < cpThreshold);
+		return (iimd.getFileFormat() == FileFormat.BINARY
+			|| estFilesize < cpThreshold); //for text conservative
 	}
 	
 	public static boolean checkCPCheckpoint(DataCharacteristics dc) {
@@ -1601,9 +1603,13 @@ public class Recompiler {
 			&& OptimizerUtils.isValidCPDimensions(dc.getRows(), dc.getCols())
 			&& !OptimizerUtils.exceedsCachingThreshold(dc.getCols(), OptimizerUtils.estimateSize(dc));
 	}
+
+	public static void executeInMemoryReblock(ExecutionContext ec, String varin, String varout) {
+		executeInMemoryReblock(ec, varin, varout, null);
+	}
 	
 	@SuppressWarnings("unchecked")
-	public static void executeInMemoryReblock(ExecutionContext ec, String varin, String varout) {
+	public static void executeInMemoryReblock(ExecutionContext ec, String varin, String varout, LineageItem litem) {
 		CacheableData<CacheBlock> in = (CacheableData<CacheBlock>) ec.getCacheableData(varin);
 		CacheableData<CacheBlock> out = (CacheableData<CacheBlock>) ec.getCacheableData(varout);
 
@@ -1618,6 +1624,7 @@ public class Recompiler {
 			
 			//set output (incl update matrix characteristics)
 			out.acquireModify(mb);
+			out.setCacheLineage(litem);
 			out.release();
 			in.release();
 		}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java
index f0c81f4..cae0fbe 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java
@@ -119,8 +119,9 @@ public class CSVReblockSPInstruction extends UnarySPInstruction {
 
 		//check for in-memory reblock (w/ lazy spark context, potential for latency reduction)
 		if( Recompiler.checkCPReblock(sec, input1.getName()) ) {
-			if( input1.getDataType().isMatrix() || input1.getDataType().isFrame() )
+			if( input1.getDataType().isMatrix() || input1.getDataType().isFrame() ) {
 				Recompiler.executeInMemoryReblock(sec, input1.getName(), output.getName());
+			}
 			Statistics.decrementNoOfExecutedSPInst();
 			return;
 		}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
index 81b50b8..b8192a9 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
@@ -21,6 +21,7 @@ package org.apache.sysds.runtime.instructions.spark;
 
 import java.util.Set;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -41,6 +42,7 @@ import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
 import org.apache.sysds.runtime.io.FileFormatPropertiesLIBSVM;
 import org.apache.sysds.runtime.io.FileFormatPropertiesMM;
 import org.apache.sysds.runtime.io.IOUtilFunctions;
+import org.apache.sysds.runtime.lineage.LineageItem;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixCell;
@@ -48,6 +50,7 @@ import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysds.runtime.matrix.operators.Operator;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
 import org.apache.sysds.runtime.meta.MetaDataFormat;
+import org.apache.sysds.runtime.util.ProgramConverter;
 import org.apache.sysds.utils.Statistics;
 
 public class ReblockSPInstruction extends UnarySPInstruction {
@@ -96,8 +99,10 @@ public class ReblockSPInstruction extends UnarySPInstruction {
 
 		//check for in-memory reblock (w/ lazy spark context, potential for latency reduction)
 		if( Recompiler.checkCPReblock(sec, input1.getName()) ) {
-			if( input1.getDataType().isMatrix() || input1.getDataType().isFrame() )
-				Recompiler.executeInMemoryReblock(sec, input1.getName(), output.getName());
+			if( input1.getDataType().isMatrix() || input1.getDataType().isFrame() ) {
+				Recompiler.executeInMemoryReblock(sec, input1.getName(), output.getName(),
+					iimd.getFileFormat()==FileFormat.BINARY ? getLineageItem(ec).getValue() : null);
+			}
 			Statistics.decrementNoOfExecutedSPInst();
 			return;
 		}
@@ -256,4 +261,15 @@ public class ReblockSPInstruction extends UnarySPInstruction {
 				+ "for ReblockSPInstruction: " + fmt.toString());
 		}
 	}
+	
+	@Override
+	public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
+		//construct reblock lineage without existing createvar lineage
+		if( ec.getLineage() == null ) {
+			return Pair.of(output.getName(), new LineageItem(
+				ProgramConverter.serializeDataObject(input1.getName(), ec.getCacheableData(input1)), "cache_rblk"));
+		}
+		//default reblock w/ active lineage tracing
+		return super.getLineageItem(ec);
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java
index fe31b38..c51b1c1 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types.DataType;
+import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.common.Types.OpOp1;
 import org.apache.sysds.common.Types.OpOp2;
 import org.apache.sysds.common.Types.OpOp3;
@@ -58,6 +59,7 @@ import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.BasicProgramBlock;
 import org.apache.sysds.runtime.controlprogram.FunctionProgramBlock;
 import org.apache.sysds.runtime.controlprogram.Program;
+import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContextFactory;
 import org.apache.sysds.runtime.instructions.Instruction;
@@ -71,6 +73,7 @@ import org.apache.sysds.runtime.instructions.cp.ScalarObjectFactory;
 import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
 import org.apache.sysds.runtime.instructions.spark.RandSPInstruction;
 import org.apache.sysds.runtime.instructions.spark.SPInstruction.SPType;
+import org.apache.sysds.runtime.util.ProgramConverter;
 import org.apache.sysds.utils.Explain;
 import org.apache.sysds.utils.Explain.ExplainCounts;
 import org.apache.sysds.utils.Statistics;
@@ -200,6 +203,20 @@ public class LineageRecomputeUtils {
 					operands.put(item.getId(), input); // order preserving
 					break;
 				}
+				else if( item.getOpcode().equals("cache_rblk") ) {
+					CacheableData<?> dat = (CacheableData<?>)ProgramConverter.parseDataObject(item.getData())[1];
+					DataOp hop = new DataOp("tmp", dat.getDataType(), dat.getValueType(),
+						OpOpData.PERSISTENTREAD, dat.getFileName(), dat.getNumRows(),
+						dat.getNumColumns(), dat.getDataCharacteristics().getNonZeros(), -1);
+					hop.setFileFormat(FileFormat.BINARY);
+					hop.setInputBlocksize(dat.getBlocksize());
+					hop.setBlocksize(ConfigurationManager.getBlocksize());
+					hop.setRequiresReblock(true);
+					operands.put(item.getId(), hop);
+					break;
+				}
+				
+				
 				Instruction inst = InstructionParser.parseSingleInstruction(item.getData());
 				
 				if (inst instanceof DataGenCPInstruction) {