You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2020/11/28 22:48:23 UTC
[systemds] 02/02: [SYSTEMDS-2550] Fix in-memory reblock for
federated matrices/frames
This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
commit 9641173dd54ba9d43e4869483006bfc4fc66897c
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Sat Nov 28 23:47:53 2020 +0100
[SYSTEMDS-2550] Fix in-memory reblock for federated matrices/frames
This patch fixes the spark reblock instructions (always compiled in
hybrid mode), which incorrectly consolidate federated matrices/frames
into the driver. We now simply extended the implementation to respect
existing federated data objects.
---
.../apache/sysds/hops/recompile/Recompiler.java | 46 +++++++++-------------
.../spark/CSVReblockSPInstruction.java | 6 +--
.../instructions/spark/ReblockSPInstruction.java | 6 +--
3 files changed, 23 insertions(+), 35 deletions(-)
diff --git a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
index c785cfc..6e960e7 100644
--- a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
+++ b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
@@ -68,6 +68,7 @@ import org.apache.sysds.runtime.controlprogram.LocalVariableMap;
import org.apache.sysds.runtime.controlprogram.ParForProgramBlock;
import org.apache.sysds.runtime.controlprogram.ProgramBlock;
import org.apache.sysds.runtime.controlprogram.WhileProgramBlock;
+import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
@@ -80,7 +81,6 @@ import org.apache.sysds.runtime.instructions.cp.FunctionCallCPInstruction;
import org.apache.sysds.runtime.instructions.cp.IntObject;
import org.apache.sysds.runtime.instructions.cp.ScalarObject;
import org.apache.sysds.runtime.io.IOUtilFunctions;
-import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
@@ -1568,33 +1568,25 @@ public class Recompiler
&& !OptimizerUtils.exceedsCachingThreshold(dc.getCols(), OptimizerUtils.estimateSize(dc));
}
- public static void executeInMemoryMatrixReblock(ExecutionContext ec, String varin, String varout) {
- MatrixObject in = ec.getMatrixObject(varin);
- MatrixObject out = ec.getMatrixObject(varout);
+ @SuppressWarnings("unchecked")
+ public static void executeInMemoryReblock(ExecutionContext ec, String varin, String varout) {
+ CacheableData<CacheBlock> in = (CacheableData<CacheBlock>) ec.getCacheableData(varin);
+ CacheableData<CacheBlock> out = (CacheableData<CacheBlock>) ec.getCacheableData(varout);
- //read text input matrix (through buffer pool, matrix object carries all relevant
- //information including additional arguments for csv reblock)
- MatrixBlock mb = in.acquireRead();
-
- //set output (incl update matrix characteristics)
- out.acquireModify( mb );
- out.release();
- in.release();
- }
-
- public static void executeInMemoryFrameReblock(ExecutionContext ec, String varin, String varout)
- {
- FrameObject in = ec.getFrameObject(varin);
- FrameObject out = ec.getFrameObject(varout);
-
- //read text input frame (through buffer pool, frame object carries all relevant
- //information including additional arguments for csv reblock)
- FrameBlock fb = in.acquireRead();
-
- //set output (incl update matrix characteristics)
- out.acquireModify( fb );
- out.release();
- in.release();
+ if( in.isFederated() ) {
+ out.setMetaData(in.getMetaData());
+ out.setFedMapping(in.getFedMapping());
+ }
+ else {
+ //read text input matrix (through buffer pool, matrix object carries all relevant
+ //information including additional arguments for csv reblock)
+ CacheBlock mb = in.acquireRead();
+
+ //set output (incl update matrix characteristics)
+ out.acquireModify(mb);
+ out.release();
+ in.release();
+ }
}
private static void tryReadMetaDataFileDataCharacteristics( DataOp dop )
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java
index d073a3c..be4adc3 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/CSVReblockSPInstruction.java
@@ -114,10 +114,8 @@ public class CSVReblockSPInstruction extends UnarySPInstruction {
//check for in-memory reblock (w/ lazy spark context, potential for latency reduction)
if( Recompiler.checkCPReblock(sec, input1.getName()) ) {
- if( input1.getDataType() == DataType.MATRIX )
- Recompiler.executeInMemoryMatrixReblock(sec, input1.getName(), output.getName());
- else if( input1.getDataType() == DataType.FRAME )
- Recompiler.executeInMemoryFrameReblock(sec, input1.getName(), output.getName());
+ if( input1.getDataType().isMatrix() || input1.getDataType().isFrame() )
+ Recompiler.executeInMemoryReblock(sec, input1.getName(), output.getName());
Statistics.decrementNoOfExecutedSPInst();
return;
}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
index 46ab52e..a27a760 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/ReblockSPInstruction.java
@@ -97,10 +97,8 @@ public class ReblockSPInstruction extends UnarySPInstruction {
//check for in-memory reblock (w/ lazy spark context, potential for latency reduction)
if( Recompiler.checkCPReblock(sec, input1.getName()) ) {
- if( input1.getDataType() == DataType.MATRIX )
- Recompiler.executeInMemoryMatrixReblock(sec, input1.getName(), output.getName());
- else if( input1.getDataType() == DataType.FRAME )
- Recompiler.executeInMemoryFrameReblock(sec, input1.getName(), output.getName());
+ if( input1.getDataType().isMatrix() || input1.getDataType().isFrame() )
+ Recompiler.executeInMemoryReblock(sec, input1.getName(), output.getName());
Statistics.decrementNoOfExecutedSPInst();
return;
}