You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/07/30 21:34:39 UTC

[systemds] branch master updated: [SYSTEMDS-3079] Improved transitive spark exec type selection

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new b59e80f  [SYSTEMDS-3079] Improved transitive spark exec type selection
b59e80f is described below

commit b59e80fbc3945947c207e8d23f81506a3fecec69
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Fri Jul 30 23:33:38 2021 +0200

    [SYSTEMDS-3079] Improved transitive spark exec type selection
    
    This patch makes an additional improvement to the transitive spark exec
    type selection, specifically for situations where spark instructions in
    one DAG created an intermediate and the potential candidates for
    transitive spark exec types are in another DAG on top of a transient
    read. We now probe the symbol table, if such a tread is only availabe as
    RDD and compile the candidate hops accordingly.
    
    Furthermore, this patch also includes changes of broadcasting and
    parallelizing local matrix and frames by using shallow copies where
    possible. This can significantly reduce GC overheads by a shallow copy
    of sparse rows if the number of columns is less or equal the blocksize.
    
    On a scenario of PCA on USCensus 8x replicated w/ 100GB driver, this
    change improved performance from  216s-252s (variance due to GC) down to
    78s.
---
 src/main/java/org/apache/sysds/hops/AggBinaryOp.java      | 13 +++++++------
 src/main/java/org/apache/sysds/hops/AggUnaryOp.java       | 11 ++++++-----
 src/main/java/org/apache/sysds/hops/DataOp.java           |  9 +++++++++
 .../java/org/apache/sysds/hops/recompile/Recompiler.java  |  4 ++++
 .../sysds/runtime/controlprogram/caching/CacheBlock.java  | 15 +++++++++++++++
 .../controlprogram/context/SparkExecutionContext.java     |  2 +-
 .../java/org/apache/sysds/runtime/data/TensorBlock.java   |  5 +++++
 .../runtime/instructions/spark/data/PartitionedBlock.java |  2 +-
 .../org/apache/sysds/runtime/matrix/data/FrameBlock.java  | 10 ++++++++--
 .../org/apache/sysds/runtime/matrix/data/MatrixBlock.java |  1 +
 .../test/functions/builtin/BuiltinGridSearchTest.java     |  1 +
 11 files changed, 58 insertions(+), 15 deletions(-)

diff --git a/src/main/java/org/apache/sysds/hops/AggBinaryOp.java b/src/main/java/org/apache/sysds/hops/AggBinaryOp.java
index 3b0da87..6637192 100644
--- a/src/main/java/org/apache/sysds/hops/AggBinaryOp.java
+++ b/src/main/java/org/apache/sysds/hops/AggBinaryOp.java
@@ -438,13 +438,14 @@ public class AggBinaryOp extends MultiThreadedHop
 	private boolean isApplicableForTransitiveSparkExecType(boolean left) 
 	{
 		int index = left ? 0 : 1;
-		return !(getInput().get(index) instanceof DataOp && ((DataOp)getInput().get(index)).requiresCheckpoint())
-			&& (!HopRewriteUtils.isTransposeOperation(getInput().get(index))
+		return !(getInput(index) instanceof DataOp && ((DataOp)getInput(index)).requiresCheckpoint())
+			&& (!HopRewriteUtils.isTransposeOperation(getInput(index))
 				|| (left && !isLeftTransposeRewriteApplicable(true)))
-			&& getInput().get(index).getParent().size()==1 //bagg is only parent
-			&& !getInput().get(index).areDimsBelowThreshold() 
-			&& getInput().get(index).optFindExecType() == ExecType.SPARK
-			&& getInput().get(index).getOutputMemEstimate()>getOutputMemEstimate();
+			&& getInput(index).getParent().size()==1 //bagg is only parent
+			&& !getInput(index).areDimsBelowThreshold() 
+			&& (getInput(index).optFindExecType() == ExecType.SPARK
+				|| (getInput(index) instanceof DataOp && ((DataOp)getInput(index)).hasOnlyRDD()))
+			&& getInput(index).getOutputMemEstimate()>getOutputMemEstimate();
 	}
 	
 	/**
diff --git a/src/main/java/org/apache/sysds/hops/AggUnaryOp.java b/src/main/java/org/apache/sysds/hops/AggUnaryOp.java
index 2055411..1ad6ac7 100644
--- a/src/main/java/org/apache/sysds/hops/AggUnaryOp.java
+++ b/src/main/java/org/apache/sysds/hops/AggUnaryOp.java
@@ -374,12 +374,13 @@ public class AggUnaryOp extends MultiThreadedHop
 		//single parent also in spark because it's likely cheap and reduces data transfer)
 		//we also allow multiple parents, if all other parents are already in Spark mode
 		if( transitive && _etype == ExecType.CP && _etypeForced != ExecType.CP
-			&& !(getInput().get(0) instanceof DataOp)  //input is not checkpoint
-			&& getInput().get(0).optFindExecType() == ExecType.SPARK
-			&& (getInput().get(0).getParent().size()==1 //uagg is only parent, or 
-				|| getInput().get(0).getParent().stream().filter(h -> h != this)
+			&& ((!(getInput(0) instanceof DataOp)  //input is not checkpoint
+				&& getInput(0).optFindExecType() == ExecType.SPARK)
+				|| (getInput(0) instanceof DataOp && ((DataOp)getInput(0)).hasOnlyRDD()))
+			&& (getInput(0).getParent().size()==1 //uagg is only parent, or 
+				|| getInput(0).getParent().stream().filter(h -> h != this)
 					.allMatch(h -> h.optFindExecType(false) == ExecType.SPARK)
-				|| !requiresAggregation(getInput().get(0), _direction)) ) //w/o agg
+				|| !requiresAggregation(getInput(0), _direction)) ) //w/o agg
 		{
 			//pull unary aggregate into spark 
 			_etype = ExecType.SPARK;
diff --git a/src/main/java/org/apache/sysds/hops/DataOp.java b/src/main/java/org/apache/sysds/hops/DataOp.java
index 1d2237c..8373f73 100644
--- a/src/main/java/org/apache/sysds/hops/DataOp.java
+++ b/src/main/java/org/apache/sysds/hops/DataOp.java
@@ -55,6 +55,7 @@ public class DataOp extends Hop {
 	//read dataop properties
 	private FileFormat _inFormat = FileFormat.TEXT;
 	private long _inBlocksize = -1;
+	private boolean _hasOnlyRDD = false;
 	
 	private boolean _recompileRead = true;
 	
@@ -254,6 +255,14 @@ public class DataOp extends Hop {
 		return _paramIndexMap.get(name);
 	}
 	
+	public void setOnlyRDD(boolean flag) {
+		_hasOnlyRDD = flag;
+	}
+	
+	public boolean hasOnlyRDD() {
+		return _hasOnlyRDD;
+	}
+	
 	@Override
 	public boolean isGPUEnabled() {
 		return false;
diff --git a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
index 77e7aa8..7a6127c 100644
--- a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
+++ b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
@@ -1341,6 +1341,10 @@ public class Recompiler
 					d.setDim2(to.getNumColumns());
 					d.setNnz(to.getNnz());
 				}
+				if( dat instanceof CacheableData<?> ) {
+					CacheableData<?> cd = (CacheableData<?>) dat;
+					d.setOnlyRDD(!cd.isCached(true) &&cd.getRDDHandle()!=null);
+				}
 			}
 		}
 		//special case for persistent reads with unknown size (read-after-write)
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheBlock.java
index 17741f8..b44c18b 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheBlock.java
@@ -99,6 +99,21 @@ public interface CacheBlock extends Writable
 	public CacheBlock slice(int rl, int ru, int cl, int cu, CacheBlock block);
 	
 	/**
+	 * Slice a sub block out of the current block and write into the given output block.
+	 * This method returns the passed instance if not null.
+	 * 
+	 * @param rl row lower
+	 * @param ru row upper
+	 * @param cl column lower
+	 * @param cu column upper
+	 * @param deep enforce deep-copy
+	 * @param block cache block
+	 * @return sub-block of cache block
+	 */
+	public CacheBlock slice(int rl, int ru, int cl, int cu, boolean deep, CacheBlock block);
+	
+	
+	/**
 	 * Merge the given block into the current block. Both blocks needs to be of equal 
 	 * dimensions and contain disjoint non-zero cells.
 	 * 
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
index 1f6c275..331e54f 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
@@ -937,7 +937,7 @@ public class SparkExecutionContext extends ExecutionContext
 			int row_offset = (int)blockRow*mc.getBlocksize();
 			int col_offset = (int)blockCol*mc.getBlocksize();
 			block = mb.slice( row_offset, row_offset+maxRow-1,
-				col_offset, col_offset+maxCol-1, block );
+				col_offset, col_offset+maxCol-1, false, block );
 			//create key-value pair
 			return new Tuple2<>(new MatrixIndexes(blockRow+1, blockCol+1), block);
 		}
diff --git a/src/main/java/org/apache/sysds/runtime/data/TensorBlock.java b/src/main/java/org/apache/sysds/runtime/data/TensorBlock.java
index d2167c8..51c4280 100644
--- a/src/main/java/org/apache/sysds/runtime/data/TensorBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/data/TensorBlock.java
@@ -285,6 +285,11 @@ public class TensorBlock implements CacheBlock, Externalizable {
 
 	@Override
 	public CacheBlock slice(int rl, int ru, int cl, int cu, CacheBlock block) {
+		return slice(rl, ru, cl, cu, false, block);
+	}
+	
+	@Override
+	public CacheBlock slice(int rl, int ru, int cl, int cu, boolean deep, CacheBlock block) {
 		if( !(block instanceof TensorBlock) )
 			throw new RuntimeException("TensorBlock.slice(int,int,int,int,CacheBlock) CacheBlock was no TensorBlock");
 		TensorBlock tb = (TensorBlock) block;
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/data/PartitionedBlock.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/data/PartitionedBlock.java
index 4acd4c5..9f6bcc5 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/data/PartitionedBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/data/PartitionedBlock.java
@@ -76,7 +76,7 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
 				int j = index % ncblks;
 				T tmp = (T) CacheBlockFactory.newInstance(code);
 				return block.slice(i * _blen, Math.min((i + 1) * _blen, rlen) - 1,
-					j * _blen, Math.min((j + 1) * _blen, clen) - 1, tmp);
+					j * _blen, Math.min((j + 1) * _blen, clen) - 1, false, tmp);
 			});
 		} catch(Exception ex) {
 			throw new RuntimeException("Failed partitioning of broadcast variable input.", ex);
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
index 8ee6f33..b2e0c24 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
@@ -1060,6 +1060,11 @@ public class FrameBlock implements CacheBlock, Externalizable  {
 				(int)ixrange.colStart, (int)ixrange.colEnd, ret);
 	}
 
+	@Override
+	public FrameBlock slice(int rl, int ru, int cl, int cu, CacheBlock retCache) {
+		return slice(rl, ru, cl, cu, false, retCache);
+	}
+	
 	/**
 	 * Right indexing operations to slice a subframe out of this frame block.
 	 * Note that the existing column value types are preserved.
@@ -1068,11 +1073,12 @@ public class FrameBlock implements CacheBlock, Externalizable  {
 	 * @param ru row upper index, inclusive, 0-based
 	 * @param cl column lower index, inclusive, 0-based
 	 * @param cu column upper index, inclusive, 0-based
+	 * @param deep enforce deep-copy
 	 * @param retCache cache block
 	 * @return frame block
 	 */
 	@Override
-	public FrameBlock slice(int rl, int ru, int cl, int cu, CacheBlock retCache) {
+	public FrameBlock slice(int rl, int ru, int cl, int cu, boolean deep, CacheBlock retCache) {
 		FrameBlock ret = (FrameBlock)retCache;
 		// check the validity of bounds
 		if (   rl < 0 || rl >= getNumRows() || ru < rl || ru >= getNumRows()
@@ -1105,7 +1111,7 @@ public class FrameBlock implements CacheBlock, Externalizable  {
 			ret._coldata = new Array[numCols];
 
 		//fast-path: shallow copy column indexing
-		if( ret._numRows == _numRows ) {
+		if( ret._numRows == _numRows && !deep ) {
 			//this shallow copy does not only avoid an array copy, but
 			//also allows for bi-directional reuses of recodemaps
 			for( int j=cl; j<=cu; j++ )
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
index 22b7e5c..77e949e 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
@@ -4012,6 +4012,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 	 * @param ret output sliced out matrix block
 	 * @return matrix block output matrix block
 	 */
+	@Override
 	public MatrixBlock slice(int rl, int ru, int cl, int cu, boolean deep, CacheBlock ret) {
 		validateSliceArgument(rl, ru, cl, cu);
 		
diff --git a/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinGridSearchTest.java b/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinGridSearchTest.java
index 6484e0c..fa06668 100644
--- a/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinGridSearchTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinGridSearchTest.java
@@ -44,6 +44,7 @@ public class BuiltinGridSearchTest extends AutomatedTestBase
 	
 	@Override
 	public void setUp() {
+		TestUtils.clearAssertionInformation();
 		addTestConfiguration(TEST_NAME1,new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1,new String[]{"R"}));
 		addTestConfiguration(TEST_NAME2,new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2,new String[]{"R"}));
 		addTestConfiguration(TEST_NAME3,new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3,new String[]{"R"}));