You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/07/30 21:34:39 UTC
[systemds] branch master updated: [SYSTEMDS-3079] Improved
transitive spark exec type selection
This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new b59e80f [SYSTEMDS-3079] Improved transitive spark exec type selection
b59e80f is described below
commit b59e80fbc3945947c207e8d23f81506a3fecec69
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Fri Jul 30 23:33:38 2021 +0200
[SYSTEMDS-3079] Improved transitive spark exec type selection
This patch makes an additional improvement to the transitive spark exec
type selection, specifically for situations where spark instructions in
one DAG created an intermediate and the potential candidates for
transitive spark exec types are in another DAG on top of a transient
read. We now probe the symbol table, if such a tread is only availabe as
RDD and compile the candidate hops accordingly.
Furthermore, this patch also includes changes of broadcasting and
parallelizing local matrix and frames by using shallow copies where
possible. This can significantly reduce GC overheads by a shallow copy
of sparse rows if the number of columns is less or equal the blocksize.
On a scenario of PCA on USCensus 8x replicated w/ 100GB driver, this
change improved performance from 216s-252s (variance due to GC) down to
78s.
---
src/main/java/org/apache/sysds/hops/AggBinaryOp.java | 13 +++++++------
src/main/java/org/apache/sysds/hops/AggUnaryOp.java | 11 ++++++-----
src/main/java/org/apache/sysds/hops/DataOp.java | 9 +++++++++
.../java/org/apache/sysds/hops/recompile/Recompiler.java | 4 ++++
.../sysds/runtime/controlprogram/caching/CacheBlock.java | 15 +++++++++++++++
.../controlprogram/context/SparkExecutionContext.java | 2 +-
.../java/org/apache/sysds/runtime/data/TensorBlock.java | 5 +++++
.../runtime/instructions/spark/data/PartitionedBlock.java | 2 +-
.../org/apache/sysds/runtime/matrix/data/FrameBlock.java | 10 ++++++++--
.../org/apache/sysds/runtime/matrix/data/MatrixBlock.java | 1 +
.../test/functions/builtin/BuiltinGridSearchTest.java | 1 +
11 files changed, 58 insertions(+), 15 deletions(-)
diff --git a/src/main/java/org/apache/sysds/hops/AggBinaryOp.java b/src/main/java/org/apache/sysds/hops/AggBinaryOp.java
index 3b0da87..6637192 100644
--- a/src/main/java/org/apache/sysds/hops/AggBinaryOp.java
+++ b/src/main/java/org/apache/sysds/hops/AggBinaryOp.java
@@ -438,13 +438,14 @@ public class AggBinaryOp extends MultiThreadedHop
private boolean isApplicableForTransitiveSparkExecType(boolean left)
{
int index = left ? 0 : 1;
- return !(getInput().get(index) instanceof DataOp && ((DataOp)getInput().get(index)).requiresCheckpoint())
- && (!HopRewriteUtils.isTransposeOperation(getInput().get(index))
+ return !(getInput(index) instanceof DataOp && ((DataOp)getInput(index)).requiresCheckpoint())
+ && (!HopRewriteUtils.isTransposeOperation(getInput(index))
|| (left && !isLeftTransposeRewriteApplicable(true)))
- && getInput().get(index).getParent().size()==1 //bagg is only parent
- && !getInput().get(index).areDimsBelowThreshold()
- && getInput().get(index).optFindExecType() == ExecType.SPARK
- && getInput().get(index).getOutputMemEstimate()>getOutputMemEstimate();
+ && getInput(index).getParent().size()==1 //bagg is only parent
+ && !getInput(index).areDimsBelowThreshold()
+ && (getInput(index).optFindExecType() == ExecType.SPARK
+ || (getInput(index) instanceof DataOp && ((DataOp)getInput(index)).hasOnlyRDD()))
+ && getInput(index).getOutputMemEstimate()>getOutputMemEstimate();
}
/**
diff --git a/src/main/java/org/apache/sysds/hops/AggUnaryOp.java b/src/main/java/org/apache/sysds/hops/AggUnaryOp.java
index 2055411..1ad6ac7 100644
--- a/src/main/java/org/apache/sysds/hops/AggUnaryOp.java
+++ b/src/main/java/org/apache/sysds/hops/AggUnaryOp.java
@@ -374,12 +374,13 @@ public class AggUnaryOp extends MultiThreadedHop
//single parent also in spark because it's likely cheap and reduces data transfer)
//we also allow multiple parents, if all other parents are already in Spark mode
if( transitive && _etype == ExecType.CP && _etypeForced != ExecType.CP
- && !(getInput().get(0) instanceof DataOp) //input is not checkpoint
- && getInput().get(0).optFindExecType() == ExecType.SPARK
- && (getInput().get(0).getParent().size()==1 //uagg is only parent, or
- || getInput().get(0).getParent().stream().filter(h -> h != this)
+ && ((!(getInput(0) instanceof DataOp) //input is not checkpoint
+ && getInput(0).optFindExecType() == ExecType.SPARK)
+ || (getInput(0) instanceof DataOp && ((DataOp)getInput(0)).hasOnlyRDD()))
+ && (getInput(0).getParent().size()==1 //uagg is only parent, or
+ || getInput(0).getParent().stream().filter(h -> h != this)
.allMatch(h -> h.optFindExecType(false) == ExecType.SPARK)
- || !requiresAggregation(getInput().get(0), _direction)) ) //w/o agg
+ || !requiresAggregation(getInput(0), _direction)) ) //w/o agg
{
//pull unary aggregate into spark
_etype = ExecType.SPARK;
diff --git a/src/main/java/org/apache/sysds/hops/DataOp.java b/src/main/java/org/apache/sysds/hops/DataOp.java
index 1d2237c..8373f73 100644
--- a/src/main/java/org/apache/sysds/hops/DataOp.java
+++ b/src/main/java/org/apache/sysds/hops/DataOp.java
@@ -55,6 +55,7 @@ public class DataOp extends Hop {
//read dataop properties
private FileFormat _inFormat = FileFormat.TEXT;
private long _inBlocksize = -1;
+ private boolean _hasOnlyRDD = false;
private boolean _recompileRead = true;
@@ -254,6 +255,14 @@ public class DataOp extends Hop {
return _paramIndexMap.get(name);
}
+ public void setOnlyRDD(boolean flag) {
+ _hasOnlyRDD = flag;
+ }
+
+ public boolean hasOnlyRDD() {
+ return _hasOnlyRDD;
+ }
+
@Override
public boolean isGPUEnabled() {
return false;
diff --git a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
index 77e7aa8..7a6127c 100644
--- a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
+++ b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
@@ -1341,6 +1341,10 @@ public class Recompiler
d.setDim2(to.getNumColumns());
d.setNnz(to.getNnz());
}
+ if( dat instanceof CacheableData<?> ) {
+ CacheableData<?> cd = (CacheableData<?>) dat;
+ d.setOnlyRDD(!cd.isCached(true) &&cd.getRDDHandle()!=null);
+ }
}
}
//special case for persistent reads with unknown size (read-after-write)
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheBlock.java
index 17741f8..b44c18b 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheBlock.java
@@ -99,6 +99,21 @@ public interface CacheBlock extends Writable
public CacheBlock slice(int rl, int ru, int cl, int cu, CacheBlock block);
/**
+ * Slice a sub block out of the current block and write into the given output block.
+ * This method returns the passed instance if not null.
+ *
+ * @param rl row lower
+ * @param ru row upper
+ * @param cl column lower
+ * @param cu column upper
+ * @param deep enforce deep-copy
+ * @param block cache block
+ * @return sub-block of cache block
+ */
+ public CacheBlock slice(int rl, int ru, int cl, int cu, boolean deep, CacheBlock block);
+
+
+ /**
* Merge the given block into the current block. Both blocks needs to be of equal
* dimensions and contain disjoint non-zero cells.
*
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
index 1f6c275..331e54f 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
@@ -937,7 +937,7 @@ public class SparkExecutionContext extends ExecutionContext
int row_offset = (int)blockRow*mc.getBlocksize();
int col_offset = (int)blockCol*mc.getBlocksize();
block = mb.slice( row_offset, row_offset+maxRow-1,
- col_offset, col_offset+maxCol-1, block );
+ col_offset, col_offset+maxCol-1, false, block );
//create key-value pair
return new Tuple2<>(new MatrixIndexes(blockRow+1, blockCol+1), block);
}
diff --git a/src/main/java/org/apache/sysds/runtime/data/TensorBlock.java b/src/main/java/org/apache/sysds/runtime/data/TensorBlock.java
index d2167c8..51c4280 100644
--- a/src/main/java/org/apache/sysds/runtime/data/TensorBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/data/TensorBlock.java
@@ -285,6 +285,11 @@ public class TensorBlock implements CacheBlock, Externalizable {
@Override
public CacheBlock slice(int rl, int ru, int cl, int cu, CacheBlock block) {
+ return slice(rl, ru, cl, cu, false, block);
+ }
+
+ @Override
+ public CacheBlock slice(int rl, int ru, int cl, int cu, boolean deep, CacheBlock block) {
if( !(block instanceof TensorBlock) )
throw new RuntimeException("TensorBlock.slice(int,int,int,int,CacheBlock) CacheBlock was no TensorBlock");
TensorBlock tb = (TensorBlock) block;
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/data/PartitionedBlock.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/data/PartitionedBlock.java
index 4acd4c5..9f6bcc5 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/data/PartitionedBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/data/PartitionedBlock.java
@@ -76,7 +76,7 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
int j = index % ncblks;
T tmp = (T) CacheBlockFactory.newInstance(code);
return block.slice(i * _blen, Math.min((i + 1) * _blen, rlen) - 1,
- j * _blen, Math.min((j + 1) * _blen, clen) - 1, tmp);
+ j * _blen, Math.min((j + 1) * _blen, clen) - 1, false, tmp);
});
} catch(Exception ex) {
throw new RuntimeException("Failed partitioning of broadcast variable input.", ex);
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
index 8ee6f33..b2e0c24 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
@@ -1060,6 +1060,11 @@ public class FrameBlock implements CacheBlock, Externalizable {
(int)ixrange.colStart, (int)ixrange.colEnd, ret);
}
+ @Override
+ public FrameBlock slice(int rl, int ru, int cl, int cu, CacheBlock retCache) {
+ return slice(rl, ru, cl, cu, false, retCache);
+ }
+
/**
* Right indexing operations to slice a subframe out of this frame block.
* Note that the existing column value types are preserved.
@@ -1068,11 +1073,12 @@ public class FrameBlock implements CacheBlock, Externalizable {
* @param ru row upper index, inclusive, 0-based
* @param cl column lower index, inclusive, 0-based
* @param cu column upper index, inclusive, 0-based
+ * @param deep enforce deep-copy
* @param retCache cache block
* @return frame block
*/
@Override
- public FrameBlock slice(int rl, int ru, int cl, int cu, CacheBlock retCache) {
+ public FrameBlock slice(int rl, int ru, int cl, int cu, boolean deep, CacheBlock retCache) {
FrameBlock ret = (FrameBlock)retCache;
// check the validity of bounds
if ( rl < 0 || rl >= getNumRows() || ru < rl || ru >= getNumRows()
@@ -1105,7 +1111,7 @@ public class FrameBlock implements CacheBlock, Externalizable {
ret._coldata = new Array[numCols];
//fast-path: shallow copy column indexing
- if( ret._numRows == _numRows ) {
+ if( ret._numRows == _numRows && !deep ) {
//this shallow copy does not only avoid an array copy, but
//also allows for bi-directional reuses of recodemaps
for( int j=cl; j<=cu; j++ )
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
index 22b7e5c..77e949e 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
@@ -4012,6 +4012,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
* @param ret output sliced out matrix block
* @return matrix block output matrix block
*/
+ @Override
public MatrixBlock slice(int rl, int ru, int cl, int cu, boolean deep, CacheBlock ret) {
validateSliceArgument(rl, ru, cl, cu);
diff --git a/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinGridSearchTest.java b/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinGridSearchTest.java
index 6484e0c..fa06668 100644
--- a/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinGridSearchTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinGridSearchTest.java
@@ -44,6 +44,7 @@ public class BuiltinGridSearchTest extends AutomatedTestBase
@Override
public void setUp() {
+ TestUtils.clearAssertionInformation();
addTestConfiguration(TEST_NAME1,new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1,new String[]{"R"}));
addTestConfiguration(TEST_NAME2,new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2,new String[]{"R"}));
addTestConfiguration(TEST_NAME3,new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3,new String[]{"R"}));