You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/09/27 19:19:34 UTC

[2/2] systemml git commit: [SYSTEMML-2495] Adjust spark cumulative aggregate partitions

[SYSTEMML-2495] Adjust spark cumulative aggregate partitions

This patch improves the robustness of spark cumulative aggregates by
adjusting the number of partitions for intermediates of the forward pass
because this data size can significantly shrink also grow.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/0c4a3611
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/0c4a3611
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/0c4a3611

Branch: refs/heads/master
Commit: 0c4a3611c316cb13c7eaa94facd3446b34c1090e
Parents: 069863f
Author: Matthias Boehm <mb...@gmail.com>
Authored: Thu Sep 27 21:17:23 2018 +0200
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Thu Sep 27 21:17:23 2018 +0200

----------------------------------------------------------------------
 .../spark/CumulativeAggregateSPInstruction.java       | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/0c4a3611/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
index 8514acc..68cc6db 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
@@ -32,6 +32,7 @@ import org.apache.sysml.runtime.functionobjects.PlusMultiply;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
+import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
@@ -59,9 +60,11 @@ public class CumulativeAggregateSPInstruction extends AggregateUnarySPInstructio
 	public void processInstruction(ExecutionContext ec) {
 		SparkExecutionContext sec = (SparkExecutionContext)ec;
 		MatrixCharacteristics mc = sec.getMatrixCharacteristics(input1.getName());
+		MatrixCharacteristics mcOut = new MatrixCharacteristics(mc);
 		long rlen = mc.getRows();
 		int brlen = mc.getRowsPerBlock();
 		int bclen = mc.getColsPerBlock();
+		mcOut.setRows((long)(Math.ceil((double)rlen/brlen)));
 		
 		//get input
 		JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
@@ -69,12 +72,17 @@ public class CumulativeAggregateSPInstruction extends AggregateUnarySPInstructio
 		//execute unary aggregate (w/ implicit drop correction)
 		AggregateUnaryOperator auop = (AggregateUnaryOperator) _optr;
 		JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
-				in.mapToPair(new RDDCumAggFunction(auop, rlen, brlen, bclen));
-		out = RDDAggregateUtils.mergeByKey(out, false);
+			in.mapToPair(new RDDCumAggFunction(auop, rlen, brlen, bclen));
+		//merge partial aggregates, adjusting for correct number of partitions
+		//as size can significant shrink (1K) but also grow (sparse-dense)
+		int numParts = SparkUtils.getNumPreferredPartitions(mcOut);
+		int minPar = (int)Math.min(SparkExecutionContext.getDefaultParallelism(true), mcOut.getNumBlocks());
+		out = RDDAggregateUtils.mergeByKey(out, Math.max(numParts, minPar), false);
 		
 		//put output handle in symbol table
-		sec.setRDDHandleForVariable(output.getName(), out);	
+		sec.setRDDHandleForVariable(output.getName(), out);
 		sec.addLineageRDD(output.getName(), input1.getName());
+		sec.getMatrixCharacteristics(output.getName()).set(mcOut);
 	}
 
 	private static class RDDCumAggFunction implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock>