You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2020/11/28 22:48:23 UTC

[systemds] 02/02: [SYSTEMDS-2550] Fix in-memory reblock for federated matrices/frames

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 9641173dd54ba9d43e4869483006bfc4fc66897c
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Sat Nov 28 23:47:53 2020 +0100

    [SYSTEMDS-2550] Fix in-memory reblock for federated matrices/frames
    
    This patch fixes the spark reblock instructions (always compiled in
    hybrid mode), which incorrectly consolidate federated matrices/frames
    into the driver. We now simply extended the implementation to respect
    existing federated data objects.
---
 .../apache/sysds/hops/recompile/Recompiler.java    | 46 +++++++++-------------
 .../spark/CSVReblockSPInstruction.java             |  6 +--
 .../instructions/spark/ReblockSPInstruction.java   |  6 +--
 3 files changed, 23 insertions(+), 35 deletions(-)

diff --git a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
index c785cfc..6e960e7 100644
--- a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
+++ b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
@@ -68,6 +68,7 @@ import org.apache.sysds.runtime.controlprogram.LocalVariableMap;
 import org.apache.sysds.runtime.controlprogram.ParForProgramBlock;
 import org.apache.sysds.runtime.controlprogram.ProgramBlock;
 import org.apache.sysds.runtime.controlprogram.WhileProgramBlock;
+import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
 import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
@@ -80,7 +81,6 @@ import org.apache.sysds.runtime.instructions.cp.FunctionCallCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.IntObject;
 import org.apache.sysds.runtime.instructions.cp.ScalarObject;
 import org.apache.sysds.runtime.io.IOUtilFunctions;
-import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
@@ -1568,33 +1568,25 @@ public class Recompiler
 			&& !OptimizerUtils.exceedsCachingThreshold(dc.getCols(), OptimizerUtils.estimateSize(dc));
 	}
 	
-	public static void executeInMemoryMatrixReblock(ExecutionContext ec, String varin, String varout) {
-		MatrixObject in = ec.getMatrixObject(varin);
-		MatrixObject out = ec.getMatrixObject(varout);
+	@SuppressWarnings("unchecked")
+	public static void executeInMemoryReblock(ExecutionContext ec, String varin, String varout) {
+		CacheableData<CacheBlock> in = (CacheableData<CacheBlock>) ec.getCacheableData(varin);
+		CacheableData<CacheBlock> out = (CacheableData<CacheBlock>) ec.getCacheableData(varout);
 
-		//read text input matrix (through buffer pool, matrix object carries all relevant
-		//information including additional arguments for csv reblock)
-		MatrixBlock mb = in.acquireRead(); 
-		
-		//set output (incl update matrix characteristics)
-		out.acquireModify( mb );
-		out.release();
-		in.release();
-	}
-	
-	public static void executeInMemoryFrameReblock(ExecutionContext ec, String varin, String varout) 
-	{
-		FrameObject in = ec.getFrameObject(varin);
-		FrameObject out = ec.getFrameObject(varout);
-
-		//read text input frame (through buffer pool, frame object carries all relevant
-		//information including additional arguments for csv reblock)
-		FrameBlock fb = in.acquireRead(); 
-		
-		//set output (incl update matrix characteristics)
-		out.acquireModify( fb );
-		out.release();
-		in.release();
+		if( in.isFederated() ) {
+			out.setMetaData(in.getMetaData());
+			out.setFedMapping(in.getFedMapping());
+		}
+		else {
+			//read text input matrix (through buffer pool, matrix object carries all relevant
+			//information including additional arguments for csv reblock)
+			CacheBlock mb = in.acquireRead();
+			
+			//set output (incl update matrix characteristics)
+			out.acquireModify(mb);
+			out.release();
+			in.release();
+		}
 	}
 	
 	private static void tryReadMetaDataFileDataCharacteristics( DataOp dop )
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java
index d073a3c..be4adc3 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java
@@ -114,10 +114,8 @@ public class CSVReblockSPInstruction extends UnarySPInstruction {
 
 		//check for in-memory reblock (w/ lazy spark context, potential for latency reduction)
 		if( Recompiler.checkCPReblock(sec, input1.getName()) ) {
-			if( input1.getDataType() == DataType.MATRIX )
-				Recompiler.executeInMemoryMatrixReblock(sec, input1.getName(), output.getName());
-			else if( input1.getDataType() == DataType.FRAME )
-				Recompiler.executeInMemoryFrameReblock(sec, input1.getName(), output.getName());
+			if( input1.getDataType().isMatrix() || input1.getDataType().isFrame() )
+				Recompiler.executeInMemoryReblock(sec, input1.getName(), output.getName());
 			Statistics.decrementNoOfExecutedSPInst();
 			return;
 		}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
index 46ab52e..a27a760 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
@@ -97,10 +97,8 @@ public class ReblockSPInstruction extends UnarySPInstruction {
 
 		//check for in-memory reblock (w/ lazy spark context, potential for latency reduction)
 		if( Recompiler.checkCPReblock(sec, input1.getName()) ) {
-			if( input1.getDataType() == DataType.MATRIX )
-				Recompiler.executeInMemoryMatrixReblock(sec, input1.getName(), output.getName());
-			else if( input1.getDataType() == DataType.FRAME )
-				Recompiler.executeInMemoryFrameReblock(sec, input1.getName(), output.getName());
+			if( input1.getDataType().isMatrix() || input1.getDataType().isFrame() )
+				Recompiler.executeInMemoryReblock(sec, input1.getName(), output.getName());
 			Statistics.decrementNoOfExecutedSPInst();
 			return;
 		}