You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/10/28 08:11:48 UTC

[3/3] systemml git commit: [SYSTEMML-1977] Fix codegen spark row ops w/ multiple rdd inputs

[SYSTEMML-1977] Fix codegen spark row ops w/ multiple rdd inputs

This patch fixes special cases of distributed codegen spark row
operations with multiple rdd inputs (i.e., in case of large side inputs
that cannot be broadcast). We now handle the meta data management at the
driver which removes this implicit assumption that relevant inputs for
B1 row types are available as broadcasts.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/118e3c0f
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/118e3c0f
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/118e3c0f

Branch: refs/heads/master
Commit: 118e3c0f630d3a3b30755ecb712672d79f8b8d7c
Parents: ede870d
Author: Matthias Boehm <mb...@gmail.com>
Authored: Fri Oct 27 23:13:54 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Oct 27 23:13:54 2017 -0700

----------------------------------------------------------------------
 .../instructions/spark/SpoofSPInstruction.java    | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/118e3c0f/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
index b34afad..eb74fed 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
@@ -166,7 +166,7 @@ public class SpoofSPInstruction extends SPInstruction {
 		}
 		else if(_class.getSuperclass() == SpoofMultiAggregate.class) //MAGG
 		{
-			SpoofMultiAggregate op = (SpoofMultiAggregate) CodegenUtils.createInstance(_class); 	
+			SpoofMultiAggregate op = (SpoofMultiAggregate) CodegenUtils.createInstance(_class);
 			AggOp[] aggOps = op.getAggOps();
 			
 			MatrixBlock tmpMB = in.mapToPair(new MultiAggregateFunction(
@@ -178,7 +178,7 @@ public class SpoofSPInstruction extends SPInstruction {
 		else if(_class.getSuperclass() == SpoofOuterProduct.class) //OUTER
 		{
 			if( _out.getDataType()==DataType.MATRIX ) {
-				SpoofOperator op = (SpoofOperator) CodegenUtils.createInstance(_class); 	
+				SpoofOperator op = (SpoofOperator) CodegenUtils.createInstance(_class);
 				OutProdType type = ((SpoofOuterProduct)op).getOuterProdType();
 
 				//update matrix characteristics
@@ -211,9 +211,11 @@ public class SpoofSPInstruction extends SPInstruction {
 				throw new DMLRuntimeException("Invalid spark rowwise operator w/ ncol=" + 
 					mcIn.getCols()+", ncolpb="+mcIn.getColsPerBlock()+".");
 			}
-			SpoofRowwise op = (SpoofRowwise) CodegenUtils.createInstance(_class); 	
+			SpoofRowwise op = (SpoofRowwise) CodegenUtils.createInstance(_class);
+			long clen2 = (op.getRowType()==RowType.NO_AGG_CONST) ? op.getConstDim2() :
+				op.getRowType().isRowTypeB1() ? sec.getMatrixCharacteristics(_in[1].getName()).getCols() : -1;
 			RowwiseFunction fmmc = new RowwiseFunction(_class.getName(),
-				_classBytes, bcVect2, bcMatrices, scalars, (int)mcIn.getCols());
+				_classBytes, bcVect2, bcMatrices, scalars, (int)mcIn.getCols(), (int)clen2);
 			out = in.mapPartitionsToPair(fmmc, op.getRowType()==RowType.ROW_AGG
 					|| op.getRowType() == RowType.NO_AGG);
 			
@@ -434,13 +436,15 @@ public class SpoofSPInstruction extends SPInstruction {
 		private static final long serialVersionUID = -7926980450209760212L;
 
 		private final int _clen;
+		private final int _clen2;
 		private SpoofRowwise _op = null;
 		
-		public RowwiseFunction(String className, byte[] classBytes, boolean[] bcInd, ArrayList<PartitionedBroadcast<MatrixBlock>> bcMatrices, ArrayList<ScalarObject> scalars, int clen) 
+		public RowwiseFunction(String className, byte[] classBytes, boolean[] bcInd, ArrayList<PartitionedBroadcast<MatrixBlock>> bcMatrices, ArrayList<ScalarObject> scalars, int clen, int clen2) 
 			throws DMLRuntimeException
 		{			
 			super(className, classBytes, bcInd, bcMatrices, scalars);
 			_clen = clen;
+			_clen2 = clen;
 		}
 		
 		@Override
@@ -454,9 +458,7 @@ public class SpoofSPInstruction extends SPInstruction {
 			}
 			
 			//setup local memory for reuse
-			int clen2 = (int) ((_op.getRowType()==RowType.NO_AGG_CONST) ? _op.getConstDim2() :
-				_op.getRowType().isRowTypeB1() ? _inputs.get(0).getNumCols() : -1);
-			LibSpoofPrimitives.setupThreadLocalMemory(_op.getNumIntermediates(), _clen, clen2);
+			LibSpoofPrimitives.setupThreadLocalMemory(_op.getNumIntermediates(), _clen, _clen2);
 			
 			ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new ArrayList<>();
 			boolean aggIncr = (_op.getRowType().isColumnAgg() //aggregate entire partition