You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/09/27 19:19:33 UTC

[1/2] systemml git commit: [SYSTEMML-2494] Caching in multi-level spark cumulative aggregates

Repository: systemml
Updated Branches:
  refs/heads/master 8005f2546 -> 0c4a3611c


[SYSTEMML-2494] Caching in multi-level spark cumulative aggregates

This patch adds optional caching for multi-level spark cumulative
aggregates, where we cache intermediate aggregates of the forward pass
to avoid unnecessary lazy evaluation of previous levels on the backwards
pass.

Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/069863f7
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/069863f7
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/069863f7

Branch: refs/heads/master
Commit: 069863f7f95f8da1f8aa8c366d8f32dbede28a8b
Parents: 8005f25
Author: Matthias Boehm <mb...@gmail.com>
Authored: Thu Sep 27 20:36:36 2018 +0200
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Thu Sep 27 20:36:36 2018 +0200

----------------------------------------------------------------------
 src/main/java/org/apache/sysml/hops/UnaryOp.java     | 15 ++++++++++++++-
 .../apache/sysml/lops/CumulativeOffsetBinary.java    |  4 +---
 .../java/org/apache/sysml/lops/OutputParameters.java |  7 +++++++
 3 files changed, 22 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/069863f7/src/main/java/org/apache/sysml/hops/UnaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/UnaryOp.java b/src/main/java/org/apache/sysml/hops/UnaryOp.java
index 0e7563f..d1110c3 100644
--- a/src/main/java/org/apache/sysml/hops/UnaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/UnaryOp.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.lops.Aggregate;
+import org.apache.sysml.lops.Checkpoint;
 import org.apache.sysml.lops.Aggregate.OperationTypes;
 import org.apache.sysml.lops.CombineUnary;
 import org.apache.sysml.lops.CumulativeOffsetBinary;
@@ -49,6 +50,9 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 
 public class UnaryOp extends MultiThreadedHop
 {
+	private static final boolean ALLOW_CUMAGG_BROADCAST = true;
+	private static final boolean ALLOW_CUMAGG_CACHING = false;
+	
 	private OpOp1 _op = null;
 	
 	private UnaryOp() {
@@ -439,6 +443,7 @@ public class UnaryOp extends MultiThreadedHop
 		return TEMP;
 	}
 
+	@SuppressWarnings("unused")
 	private Lop constructLopsSparkCumulativeUnary() 
 	{
 		Hop input = getInput().get(0);
@@ -458,6 +463,13 @@ public class UnaryOp extends MultiThreadedHop
 		while( ((2*OptimizerUtils.estimateSize(TEMP.getOutputParameters().getNumRows(), clen) + OptimizerUtils.estimateSize(1, clen)) 
 			> OptimizerUtils.getLocalMemBudget() && TEMP.getOutputParameters().getNumRows()>1) || force )
 		{
+			//caching within multi-level cascades
+			if( ALLOW_CUMAGG_CACHING && level > 0 ) {
+				Lop oldTEMP = TEMP;
+				TEMP = new Checkpoint(oldTEMP, getDataType(), getValueType(), Checkpoint.getDefaultStorageLevelString());
+				TEMP.getOutputParameters().setDimensions(oldTEMP.getOutputParameters());
+				setLineNumbers(TEMP);
+			}
 			DATA.add(TEMP);
 	
 			//preaggregation per block (for spark, the CumulativePartialAggregate subsumes both
@@ -486,7 +498,8 @@ public class UnaryOp extends MultiThreadedHop
 			//(for spark, the CumulativeOffsetBinary subsumes both the split aggregate and 
 			//the subsequent offset binary apply of split aggregates against the original data)
 			double initValue = getCumulativeInitValue();
-			boolean broadcast = OptimizerUtils.checkSparkBroadcastMemoryBudget(OptimizerUtils.estimateSize(
+			boolean broadcast = ALLOW_CUMAGG_BROADCAST
+				&& OptimizerUtils.checkSparkBroadcastMemoryBudget(OptimizerUtils.estimateSize(
 				TEMP.getOutputParameters().getNumRows(), TEMP.getOutputParameters().getNumCols()));
 			
 			CumulativeOffsetBinary binary = new CumulativeOffsetBinary(DATA.get(level), TEMP, 

http://git-wip-us.apache.org/repos/asf/systemml/blob/069863f7/src/main/java/org/apache/sysml/lops/CumulativeOffsetBinary.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/CumulativeOffsetBinary.java b/src/main/java/org/apache/sysml/lops/CumulativeOffsetBinary.java
index 40f59fc..ceeb254 100644
--- a/src/main/java/org/apache/sysml/lops/CumulativeOffsetBinary.java
+++ b/src/main/java/org/apache/sysml/lops/CumulativeOffsetBinary.java
@@ -27,8 +27,6 @@ import org.apache.sysml.parser.Expression.*;
 
 public class CumulativeOffsetBinary extends Lop 
 {
-	private static final boolean ALLOW_BROADCAST = true;
-	
 	private OperationTypes _op;
 	private double _initValue = 0;
 	private boolean _broadcast = false;
@@ -50,7 +48,7 @@ public class CumulativeOffsetBinary extends Lop
 		
 		//in case of Spark, CumulativeOffset includes CumulativeSplit and hence needs the init value
 		_initValue = init;
-		_broadcast = ALLOW_BROADCAST && broadcast;
+		_broadcast = broadcast;
 		
 		init(data, offsets, dt, vt, et);
 	}

http://git-wip-us.apache.org/repos/asf/systemml/blob/069863f7/src/main/java/org/apache/sysml/lops/OutputParameters.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/OutputParameters.java b/src/main/java/org/apache/sysml/lops/OutputParameters.java
index c34157e..848c1db 100644
--- a/src/main/java/org/apache/sysml/lops/OutputParameters.java
+++ b/src/main/java/org/apache/sysml/lops/OutputParameters.java
@@ -87,6 +87,13 @@ public class OutputParameters
 		setDimensions(rows, cols, rows_per_block, cols_per_block, nnz);
 	}
 	
+	public void setDimensions(OutputParameters input) {
+		_num_rows = input._num_rows;
+		_num_cols = input._num_cols;
+		_num_rows_in_block = input._num_rows_in_block;
+		_num_cols_in_block = input._num_cols_in_block;
+	}
+	
 	public Format getFormat() {
 		return matrix_format;
 	}


[2/2] systemml git commit: [SYSTEMML-2495] Adjust spark cumulative aggregate partitions

Posted by mb...@apache.org.
[SYSTEMML-2495] Adjust spark cumulative aggregate partitions

This patch improves the robustness of spark cumulative aggregates by
adjusting the number of partitions for intermediates of the forward pass
because this data size can significantly shrink also grow.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/0c4a3611
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/0c4a3611
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/0c4a3611

Branch: refs/heads/master
Commit: 0c4a3611c316cb13c7eaa94facd3446b34c1090e
Parents: 069863f
Author: Matthias Boehm <mb...@gmail.com>
Authored: Thu Sep 27 21:17:23 2018 +0200
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Thu Sep 27 21:17:23 2018 +0200

----------------------------------------------------------------------
 .../spark/CumulativeAggregateSPInstruction.java       | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/0c4a3611/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
index 8514acc..68cc6db 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
@@ -32,6 +32,7 @@ import org.apache.sysml.runtime.functionobjects.PlusMultiply;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
+import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
@@ -59,9 +60,11 @@ public class CumulativeAggregateSPInstruction extends AggregateUnarySPInstructio
 	public void processInstruction(ExecutionContext ec) {
 		SparkExecutionContext sec = (SparkExecutionContext)ec;
 		MatrixCharacteristics mc = sec.getMatrixCharacteristics(input1.getName());
+		MatrixCharacteristics mcOut = new MatrixCharacteristics(mc);
 		long rlen = mc.getRows();
 		int brlen = mc.getRowsPerBlock();
 		int bclen = mc.getColsPerBlock();
+		mcOut.setRows((long)(Math.ceil((double)rlen/brlen)));
 		
 		//get input
 		JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
@@ -69,12 +72,17 @@ public class CumulativeAggregateSPInstruction extends AggregateUnarySPInstructio
 		//execute unary aggregate (w/ implicit drop correction)
 		AggregateUnaryOperator auop = (AggregateUnaryOperator) _optr;
 		JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
-				in.mapToPair(new RDDCumAggFunction(auop, rlen, brlen, bclen));
-		out = RDDAggregateUtils.mergeByKey(out, false);
+			in.mapToPair(new RDDCumAggFunction(auop, rlen, brlen, bclen));
+		//merge partial aggregates, adjusting for correct number of partitions
+		//as size can significant shrink (1K) but also grow (sparse-dense)
+		int numParts = SparkUtils.getNumPreferredPartitions(mcOut);
+		int minPar = (int)Math.min(SparkExecutionContext.getDefaultParallelism(true), mcOut.getNumBlocks());
+		out = RDDAggregateUtils.mergeByKey(out, Math.max(numParts, minPar), false);
 		
 		//put output handle in symbol table
-		sec.setRDDHandleForVariable(output.getName(), out);	
+		sec.setRDDHandleForVariable(output.getName(), out);
 		sec.addLineageRDD(output.getName(), input1.getName());
+		sec.getMatrixCharacteristics(output.getName()).set(mcOut);
 	}
 
 	private static class RDDCumAggFunction implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock>