You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/04/14 09:00:00 UTC

[1/2] systemml git commit: [SYSTEMML-2244] Fix handling of compressed blocks in few spark mm ops

Repository: systemml
Updated Branches:
  refs/heads/master ec0448850 -> 61925ab49


[SYSTEMML-2244] Fix handling of compressed blocks in few spark mm ops

This patch fixes the missing handling of compressed right-hand-side
blocks in spark cpmm, rmm, zipmm, and tsmm2 instructions. Similar to
mapmm, tsmm, mapmmchain, we now use a common primitive that internally
handles this case by calling binary operations on the compressed rhs.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/5d149a0a
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/5d149a0a
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/5d149a0a

Branch: refs/heads/master
Commit: 5d149a0af2a0921581b702a0da62d79279b6aab8
Parents: ec04488
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sat Apr 14 01:54:39 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Apr 14 01:54:39 2018 -0700

----------------------------------------------------------------------
 .../runtime/instructions/spark/CpmmSPInstruction.java   |  8 +++++---
 .../runtime/instructions/spark/MapmmSPInstruction.java  | 12 ++++++------
 .../runtime/instructions/spark/RmmSPInstruction.java    |  5 +++--
 .../runtime/instructions/spark/Tsmm2SPInstruction.java  |  2 +-
 .../runtime/instructions/spark/ZipmmSPInstruction.java  |  4 +++-
 .../runtime/matrix/data/OperationsOnMatrixValues.java   |  7 +++----
 6 files changed, 21 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/5d149a0a/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
index 5c98964..de08d83 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
@@ -43,6 +43,7 @@ import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
 import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
 import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
 import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
@@ -203,10 +204,10 @@ public class CpmmSPInstruction extends BinarySPInstruction {
 			MatrixBlock blkIn1 = (MatrixBlock)arg0._2()._1().getValue();
 			MatrixBlock blkIn2 = (MatrixBlock)arg0._2()._2().getValue();
 			MatrixIndexes ixOut = new MatrixIndexes();
-			MatrixBlock blkOut = new MatrixBlock();
 			
 			//core block matrix multiplication 
-			blkIn1.aggregateBinaryOperations(blkIn1, blkIn2, blkOut, _op);
+			MatrixBlock blkOut = OperationsOnMatrixValues
+				.performAggregateBinaryIgnoreIndexes(blkIn1, blkIn2, new MatrixBlock(), _op);
 			
 			//return target block
 			ixOut.setIndexes(arg0._2()._1().getIndexes().getRowIndex(),
@@ -234,7 +235,8 @@ public class CpmmSPInstruction extends BinarySPInstruction {
 			MatrixBlock in2 = (MatrixBlock)arg0._2()
 				.reorgOperations(_rop, new MatrixBlock(), 0, 0, 0);
 			//core block matrix multiplication
-			return in1.aggregateBinaryOperations(in1, in2, new MatrixBlock(), _op);
+			return OperationsOnMatrixValues
+				.performAggregateBinaryIgnoreIndexes(in1, in2, new MatrixBlock(), _op);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/5d149a0a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
index d43b6f8..d54ccf8 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
@@ -327,8 +327,8 @@ public class MapmmSPInstruction extends BinarySPInstruction {
 				MatrixBlock left = _pbc.getBlock(1, (int)ixIn.getRowIndex());
 				
 				//execute matrix-vector mult
-				return (MatrixBlock) OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes( 
-						left, blkIn, new MatrixBlock(), _op);						
+				return OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes( 
+					left, blkIn, new MatrixBlock(), _op);
 			}
 			else //if( _type == CacheType.RIGHT )
 			{
@@ -336,8 +336,8 @@ public class MapmmSPInstruction extends BinarySPInstruction {
 				MatrixBlock right = _pbc.getBlock((int)ixIn.getColumnIndex(), 1);
 				
 				//execute matrix-vector mult
-				return (MatrixBlock) OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes(
-						blkIn, right, new MatrixBlock(), _op);
+				return OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes(
+					blkIn, right, new MatrixBlock(), _op);
 			}
 		}
 	}
@@ -392,7 +392,7 @@ public class MapmmSPInstruction extends BinarySPInstruction {
 					MatrixBlock left = _pbc.getBlock(1, (int)ixIn.getRowIndex());
 					
 					//execute index preserving matrix multiplication
-					left.aggregateBinaryOperations(left, blkIn, blkOut, _op);
+					OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes(left, blkIn, blkOut, _op);
 				}
 				else //if( _type == CacheType.RIGHT )
 				{
@@ -400,7 +400,7 @@ public class MapmmSPInstruction extends BinarySPInstruction {
 					MatrixBlock right = _pbc.getBlock((int)ixIn.getColumnIndex(), 1);
 
 					//execute index preserving matrix multiplication
-					blkIn.aggregateBinaryOperations(blkIn, right, blkOut, _op);	
+					OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes(blkIn, right, blkOut, _op);
 				}
 			
 				return new Tuple2<>(ixIn, blkOut);

http://git-wip-us.apache.org/repos/asf/systemml/blob/5d149a0a/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
index 05f3870..294c142 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
@@ -43,6 +43,7 @@ import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
 import org.apache.sysml.runtime.matrix.data.TripleIndexes;
 import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
 import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
@@ -188,10 +189,10 @@ public class RmmSPInstruction extends BinarySPInstruction {
 			MatrixIndexes ixOut = new MatrixIndexes(ixIn.getFirstIndex(), ixIn.getSecondIndex()); //i,j
 			MatrixBlock blkIn1 = arg0._2()._1();
 			MatrixBlock blkIn2 = arg0._2()._2();
-			MatrixBlock blkOut = new MatrixBlock();
 			
 			//core block matrix multiplication 
-			blkIn1.aggregateBinaryOperations(blkIn1, blkIn2, blkOut, _op);
+			MatrixBlock blkOut = OperationsOnMatrixValues
+				.performAggregateBinaryIgnoreIndexes(blkIn1, blkIn2, new MatrixBlock(), _op);
 			
 			//output new tuple
 			return new Tuple2<>(ixOut, blkOut);

http://git-wip-us.apache.org/repos/asf/systemml/blob/5d149a0a/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
index b5e8d87..cabc2c8 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
@@ -215,7 +215,7 @@ public class Tsmm2SPInstruction extends UnarySPInstruction {
 						(int)(_type.isLeft()?1:ixin.getColumnIndex()));
 				MatrixBlock mbin2t = transpose(mbin2, new MatrixBlock()); //prep for transpose rewrite mm
 				
-				MatrixBlock out2 = (MatrixBlock) OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes( //mm
+				MatrixBlock out2 = OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes( //mm
 						_type.isLeft() ? mbin2t : mbin, _type.isLeft() ? mbin : mbin2t, new MatrixBlock(), _op);
 				
 				MatrixIndexes ixout2 = _type.isLeft() ? new MatrixIndexes(2,1) : new MatrixIndexes(1,2);

http://git-wip-us.apache.org/repos/asf/systemml/blob/5d149a0a/src/main/java/org/apache/sysml/runtime/instructions/spark/ZipmmSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ZipmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ZipmmSPInstruction.java
index ec0b300..4f168c1 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ZipmmSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ZipmmSPInstruction.java
@@ -36,6 +36,7 @@ import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
 import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
 import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
 import org.apache.sysml.runtime.matrix.operators.Operator;
@@ -124,7 +125,8 @@ public class ZipmmSPInstruction extends BinarySPInstruction {
 			MatrixBlock tmp = (MatrixBlock)in2.reorgOperations(_rop, new MatrixBlock(), 0, 0, 0);
 				
 			//core matrix multiplication (for t(y)%*%X or t(X)%*%y)
-			return tmp.aggregateBinaryOperations(tmp, in1, new MatrixBlock(), _abop);
+			return OperationsOnMatrixValues
+				.performAggregateBinaryIgnoreIndexes(tmp, in1, new MatrixBlock(), _abop);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/5d149a0a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
index 6b5b280..3715404 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
@@ -228,14 +228,13 @@ public class OperationsOnMatrixValues
 			value1.aggregateBinaryOperations(indexes1, value1, indexes2, value2, valueOut, op);
 	}
 
-	public static MatrixValue performAggregateBinaryIgnoreIndexes(MatrixBlock value1, MatrixBlock value2,
+	public static MatrixBlock performAggregateBinaryIgnoreIndexes(MatrixBlock value1, MatrixBlock value2,
 			MatrixBlock valueOut, AggregateBinaryOperator op) {
 		//perform on the value
 		if( value2 instanceof CompressedMatrixBlock )
-			value2.aggregateBinaryOperations(value1, value2, valueOut, op);
+			return value2.aggregateBinaryOperations(value1, value2, valueOut, op);
 		else
-			value1.aggregateBinaryOperations(value1, value2, valueOut, op);
-		return valueOut;
+			return value1.aggregateBinaryOperations(value1, value2, valueOut, op);
 	}
 
 	@SuppressWarnings("rawtypes")


[2/2] systemml git commit: [SYSTEMML-2243] Improved spark mapmm broadcast selection (worstcase nnz)

Posted by mb...@apache.org.
[SYSTEMML-2243] Improved spark mapmm broadcast selection (worstcase nnz)

This patch improves the operator selection logic for spark mapmm
(broadcast-based matrix multiply) operations. For equal input dimensions
and unknown nnz, we now also take the worstcase nnz estimates, via the
input memory estimates into account. This is important for
outer-product-like matrix multiplications with sparse inputs that are
extracted with indexing operations in the same DAG and hence have
unknown nnz meta data.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/61925ab4
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/61925ab4
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/61925ab4

Branch: refs/heads/master
Commit: 61925ab4912af57cdcbf7f293e91340f3127bf35
Parents: 5d149a0
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sat Apr 14 01:58:53 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Apr 14 01:58:53 2018 -0700

----------------------------------------------------------------------
 src/main/java/org/apache/sysml/hops/AggBinaryOp.java | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/61925ab4/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
index 03a1bb6..9a286d2 100644
--- a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
@@ -223,7 +223,7 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop
 				//matrix mult operation selection part 3 (SPARK type)
 				boolean tmmRewrite = HopRewriteUtils.isTransposeOperation(input1);
 				_method = optFindMMultMethodSpark ( 
-						input1.getDim1(), input1.getDim2(), input1.getRowsInBlock(), input1.getColsInBlock(), input1.getNnz(),   
+						input1.getDim1(), input1.getDim2(), input1.getRowsInBlock(), input1.getColsInBlock(), input1.getNnz(),
 						input2.getDim1(), input2.getDim2(), input2.getRowsInBlock(), input2.getColsInBlock(), input2.getNnz(),
 						mmtsj, chain, _hasLeftPMInput, tmmRewrite );
 			
@@ -1687,7 +1687,10 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop
 			//apply map mult if one side fits in remote task memory 
 			//(if so pick smaller input for distributed cache)
 			//TODO relax requirement of valid CP dimensions once we support broadcast creation from files/RDDs
-			if( m1SizeP < m2SizeP && m1_rows>=0 && m1_cols>=0
+			double em1Size = getInput().get(0).getOutputMemEstimate(); //w/ worst-case estimate
+			double em2Size = getInput().get(1).getOutputMemEstimate(); //w/ worst-case estimate
+			if( (m1SizeP < m2SizeP || (m1SizeP==m2SizeP && em1Size<em2Size) )
+				&& m1_rows>=0 && m1_cols>=0
 				&& OptimizerUtils.isValidCPDimensions(m1_rows, m1_cols) ) {
 				_spBroadcastMemEstimate = m1Size+m1SizeP;
 				return MMultMethod.MAPMM_L;