You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/09/27 16:57:41 UTC

systemml git commit: [SYSTEMML-2492] New broadcast-based spark cumulative aggregates

Repository: systemml
Updated Branches:
  refs/heads/master 59dfb50ea -> 8005f2546


[SYSTEMML-2492] New broadcast-based spark cumulative aggregates

This patch improves the performance of Spark cumulative aggregates such
as cumsum(X). We now broadcast the offsets in the backwards cascade
whenever possible to avoid shuffling the data input during a repartition
join (unless there exist a partitioner on the main input).

On a 10 node cluster, this patch improved the end-to-end performance of
10 print(sum(cumsum(X))) (without rewrite) over a dense 10M x 2K input
(160GB) from 2011s to 232s.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/8005f254
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/8005f254
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/8005f254

Branch: refs/heads/master
Commit: 8005f2546862aae73fcd371cd6939daae22994f0
Parents: 59dfb50
Author: Matthias Boehm <mb...@gmail.com>
Authored: Thu Sep 27 18:57:23 2018 +0200
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Thu Sep 27 18:57:23 2018 +0200

----------------------------------------------------------------------
 .../sysml/hops/ParameterizedBuiltinOp.java      |  2 +-
 .../java/org/apache/sysml/hops/UnaryOp.java     |  7 +-
 .../sysml/lops/CumulativeOffsetBinary.java      |  9 ++-
 .../sysml/lops/CumulativePartialAggregate.java  |  1 -
 .../spark/CumulativeOffsetSPInstruction.java    | 71 ++++++++++++++++----
 5 files changed, 71 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/8005f254/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
index af2eae3..a384b87 100644
--- a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
+++ b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
@@ -53,7 +53,7 @@ import org.apache.sysml.runtime.util.UtilFunctions;
 public class ParameterizedBuiltinOp extends MultiThreadedHop
 {
 	public static boolean FORCE_DIST_RM_EMPTY = false;
-
+	
 	//operator type
 	private ParamBuiltinOp _op;
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/8005f254/src/main/java/org/apache/sysml/hops/UnaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/UnaryOp.java b/src/main/java/org/apache/sysml/hops/UnaryOp.java
index 86f935f..0e7563f 100644
--- a/src/main/java/org/apache/sysml/hops/UnaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/UnaryOp.java
@@ -474,7 +474,7 @@ public class UnaryOp extends MultiThreadedHop
 		
 		//in-memory cum sum (of partial aggregates)
 		if( TEMP.getOutputParameters().getNumRows()!=1 ){
-			int k = OptimizerUtils.getConstrainedNumThreads( _maxNumThreads );					
+			int k = OptimizerUtils.getConstrainedNumThreads( _maxNumThreads );
 			Unary unary1 = new Unary( TEMP, HopsOpOp1LopsU.get(_op), DataType.MATRIX, ValueType.DOUBLE, ExecType.CP, k);
 			unary1.getOutputParameters().setDimensions(TEMP.getOutputParameters().getNumRows(), clen, brlen, bclen, -1);
 			setLineNumbers(unary1);
@@ -486,8 +486,11 @@ public class UnaryOp extends MultiThreadedHop
 			//(for spark, the CumulativeOffsetBinary subsumes both the split aggregate and 
 			//the subsequent offset binary apply of split aggregates against the original data)
 			double initValue = getCumulativeInitValue();
+			boolean broadcast = OptimizerUtils.checkSparkBroadcastMemoryBudget(OptimizerUtils.estimateSize(
+				TEMP.getOutputParameters().getNumRows(), TEMP.getOutputParameters().getNumCols()));
+			
 			CumulativeOffsetBinary binary = new CumulativeOffsetBinary(DATA.get(level), TEMP, 
-					DataType.MATRIX, ValueType.DOUBLE, initValue, aggtype, ExecType.SPARK);
+					DataType.MATRIX, ValueType.DOUBLE, initValue, broadcast, aggtype, ExecType.SPARK);
 			binary.getOutputParameters().setDimensions(rlen, clen, brlen, bclen, -1);
 			setLineNumbers(binary);
 			TEMP = binary;

http://git-wip-us.apache.org/repos/asf/systemml/blob/8005f254/src/main/java/org/apache/sysml/lops/CumulativeOffsetBinary.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/CumulativeOffsetBinary.java b/src/main/java/org/apache/sysml/lops/CumulativeOffsetBinary.java
index 78443e0..40f59fc 100644
--- a/src/main/java/org/apache/sysml/lops/CumulativeOffsetBinary.java
+++ b/src/main/java/org/apache/sysml/lops/CumulativeOffsetBinary.java
@@ -27,9 +27,11 @@ import org.apache.sysml.parser.Expression.*;
 
 public class CumulativeOffsetBinary extends Lop 
 {
+	private static final boolean ALLOW_BROADCAST = true;
 	
 	private OperationTypes _op;
 	private double _initValue = 0;
+	private boolean _broadcast = false;
 	
 	public CumulativeOffsetBinary(Lop data, Lop offsets, DataType dt, ValueType vt, OperationTypes op, ExecType et) 
 	{
@@ -40,7 +42,7 @@ public class CumulativeOffsetBinary extends Lop
 		init(data, offsets, dt, vt, et);
 	}
 	
-	public CumulativeOffsetBinary(Lop data, Lop offsets, DataType dt, ValueType vt, double init, OperationTypes op, ExecType et)
+	public CumulativeOffsetBinary(Lop data, Lop offsets, DataType dt, ValueType vt, double init, boolean broadcast, OperationTypes op, ExecType et)
 	{
 		super(Lop.Type.CumulativeOffsetBinary, dt, vt);
 		checkSupportedOperations(op);
@@ -48,6 +50,7 @@ public class CumulativeOffsetBinary extends Lop
 		
 		//in case of Spark, CumulativeOffset includes CumulativeSplit and hence needs the init value
 		_initValue = init;
+		_broadcast = ALLOW_BROADCAST && broadcast;
 		
 		init(data, offsets, dt, vt, et);
 	}
@@ -129,7 +132,9 @@ public class CumulativeOffsetBinary extends Lop
 		
 		if( getExecType() == ExecType.SPARK ) {
 			sb.append( OPERAND_DELIMITOR );
-			sb.append( _initValue );	
+			sb.append( _initValue );
+			sb.append( OPERAND_DELIMITOR );
+			sb.append( _broadcast );
 		}
 		
 		return sb.toString();

http://git-wip-us.apache.org/repos/asf/systemml/blob/8005f254/src/main/java/org/apache/sysml/lops/CumulativePartialAggregate.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/CumulativePartialAggregate.java b/src/main/java/org/apache/sysml/lops/CumulativePartialAggregate.java
index f50bf0c..ac42471 100644
--- a/src/main/java/org/apache/sysml/lops/CumulativePartialAggregate.java
+++ b/src/main/java/org/apache/sysml/lops/CumulativePartialAggregate.java
@@ -27,7 +27,6 @@ import org.apache.sysml.parser.Expression.*;
 
 public class CumulativePartialAggregate extends Lop 
 {
-	
 	private OperationTypes _op;
 	
 	public CumulativePartialAggregate(Lop input, DataType dt, ValueType vt, OperationTypes op, ExecType et) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/8005f254/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
index 36f02b1..c32a57b 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.api.java.function.PairFunction;
 
 import scala.Tuple2;
 
@@ -36,19 +37,22 @@ import org.apache.sysml.runtime.functionobjects.Plus;
 import org.apache.sysml.runtime.functionobjects.PlusMultiply;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.operators.BinaryOperator;
 import org.apache.sysml.runtime.matrix.operators.Operator;
 import org.apache.sysml.runtime.matrix.operators.UnaryOperator;
+import org.apache.sysml.runtime.util.UtilFunctions;
 
 public class CumulativeOffsetSPInstruction extends BinarySPInstruction {
 	private BinaryOperator _bop = null;
 	private UnaryOperator _uop = null;
-	private double _initValue = 0;
+	private final double _initValue ;
+	private final boolean _broadcast;
 
-	private CumulativeOffsetSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out, double init, String opcode, String istr) {
+	private CumulativeOffsetSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out, double init, boolean broadcast, String opcode, String istr) {
 		super(SPType.CumsumOffset, op, in1, in2, out, opcode, istr);
 
 		if ("bcumoffk+".equals(opcode)) {
@@ -73,17 +77,19 @@ public class CumulativeOffsetSPInstruction extends BinarySPInstruction {
 		}
 
 		_initValue = init;
+		_broadcast = broadcast;
 	}
 
 	public static CumulativeOffsetSPInstruction parseInstruction ( String str ) {
 		String[] parts = InstructionUtils.getInstructionPartsWithValueType( str );
-		InstructionUtils.checkNumFields ( parts, 4 );
+		InstructionUtils.checkNumFields(parts, 5);
 		String opcode = parts[0];
 		CPOperand in1 = new CPOperand(parts[1]);
 		CPOperand in2 = new CPOperand(parts[2]);
 		CPOperand out = new CPOperand(parts[3]);
 		double init = Double.parseDouble(parts[4]);
-		return new CumulativeOffsetSPInstruction(null, in1, in2, out, init, opcode, str);
+		boolean broadcast = Boolean.parseBoolean(parts[5]);
+		return new CumulativeOffsetSPInstruction(null, in1, in2, out, init, broadcast, opcode, str);
 	}
 
 	@Override
@@ -94,16 +100,25 @@ public class CumulativeOffsetSPInstruction extends BinarySPInstruction {
 		long rlen = mc2.getRows();
 		int brlen = mc2.getRowsPerBlock();
 		
-		//get inputs
-		JavaPairRDD<MatrixIndexes,MatrixBlock> inData = sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
-		JavaPairRDD<MatrixIndexes,MatrixBlock> inAgg = sec.getBinaryBlockRDDHandleForVariable( input2.getName() );
+		//get and join inputs
+		JavaPairRDD<MatrixIndexes,MatrixBlock> inData = sec.getBinaryBlockRDDHandleForVariable(input1.getName());
+		JavaPairRDD<MatrixIndexes,Tuple2<MatrixBlock,MatrixBlock>> joined = null;
 		
-		//prepare aggregates (cumsplit of offsets)
-		inAgg = inAgg.flatMapToPair(new RDDCumSplitFunction(_initValue, rlen, brlen));
+		if( _broadcast ) {
+			//broadcast offsets and broadcast join with data
+			PartitionedBroadcast<MatrixBlock> inAgg = sec.getBroadcastForVariable(input2.getName());
+			joined = inData.mapToPair(new RDDCumSplitLookupFunction(inAgg,_initValue, rlen, brlen));
+		}
+		else {
+			//prepare aggregates (cumsplit of offsets) and repartition join with data
+			joined = inData.join(sec
+				.getBinaryBlockRDDHandleForVariable(input2.getName())
+				.flatMapToPair(new RDDCumSplitFunction(_initValue, rlen, brlen)));
+		}
 		
 		//execute cumulative offset (apply cumulative op w/ offsets)
-		JavaPairRDD<MatrixIndexes,MatrixBlock> out = inData
-			.join( inAgg ).mapValues(new RDDCumOffsetFunction(_uop, _bop));
+		JavaPairRDD<MatrixIndexes,MatrixBlock> out = joined
+			.mapValues(new RDDCumOffsetFunction(_uop, _bop));
 		
 		//put output handle in symbol table
 		if( _bop.fn instanceof PlusMultiply )
@@ -111,9 +126,9 @@ public class CumulativeOffsetSPInstruction extends BinarySPInstruction {
 				.set(mc1.getRows(), 1, mc1.getRowsPerBlock(), mc1.getColsPerBlock());
 		else //general case
 			updateUnaryOutputMatrixCharacteristics(sec);
-		sec.setRDDHandleForVariable(output.getName(), out);	
+		sec.setRDDHandleForVariable(output.getName(), out);
 		sec.addLineageRDD(output.getName(), input1.getName());
-		sec.addLineageRDD(output.getName(), input2.getName());
+		sec.addLineage(output.getName(), input2.getName(), _broadcast);
 	}
 
 	private static class RDDCumSplitFunction implements PairFlatMapFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock> 
@@ -168,6 +183,36 @@ public class CumulativeOffsetSPInstruction extends BinarySPInstruction {
 			return ret.iterator();
 		}
 	}
+	
+	private static class RDDCumSplitLookupFunction implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, Tuple2<MatrixBlock,MatrixBlock>> 
+	{
+		private static final long serialVersionUID = -2785629043886477479L;
+		
+		private final PartitionedBroadcast<MatrixBlock> _pbc;
+		private final double _initValue;
+		private final int _brlen;
+		
+		public RDDCumSplitLookupFunction(PartitionedBroadcast<MatrixBlock> pbc, double initValue, long rlen, int brlen) {
+			_pbc = pbc;
+			_initValue = initValue;
+			_brlen = brlen;
+		}
+		
+		@Override
+		public Tuple2<MatrixIndexes, Tuple2<MatrixBlock,MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0) throws Exception {
+			MatrixIndexes ixIn = arg0._1();
+			MatrixBlock blkIn = arg0._2();
+			
+			//compute block and row indexes
+			long brix = UtilFunctions.computeBlockIndex(ixIn.getRowIndex()-1, _brlen);
+			int rix = UtilFunctions.computeCellInBlock(ixIn.getRowIndex()-1, _brlen);
+			
+			//lookup offset row and return joined output
+			MatrixBlock off = (ixIn.getRowIndex() == 1) ? new MatrixBlock(1, blkIn.getNumColumns(), _initValue) :
+				_pbc.getBlock((int)brix, (int)ixIn.getColumnIndex()).slice(rix, rix);
+			return new Tuple2<MatrixIndexes, Tuple2<MatrixBlock,MatrixBlock>>(ixIn, new Tuple2<>(blkIn,off));
+		}
+	}
 
 	private static class RDDCumOffsetFunction implements Function<Tuple2<MatrixBlock, MatrixBlock>, MatrixBlock> 
 	{