You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/28 20:26:09 UTC

incubator-systemml git commit: [SYSTEMML-965] Performance reduce-all rdd operations (in-place fold)

Repository: incubator-systemml
Updated Branches:
  refs/heads/master c5f3e71a3 -> 07e94fb63


[SYSTEMML-965] Performance reduce-all rdd operations (in-place fold)

The standard binary 'reduce' rdd operation requires to deep copy the
left input block to the result as it is unknown if the inputs are
partial aggregates or original blocks, which are potentially consumed by
other operations. We now use 'fold' with explicit zero element to avoid
unnecessary deep copies (object allocation and copy). On dense and
sparse linregds scenarios this showed improvements up to 6%. Note that
this change applies to all instructions relying on the core primitives
sumStable or aggStable as well as centralMoment and covariance.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/07e94fb6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/07e94fb6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/07e94fb6

Branch: refs/heads/master
Commit: 07e94fb6309209b57c9a6f30e9c2ad5fd6ce8e2f
Parents: c5f3e71
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Wed Sep 28 00:31:12 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Wed Sep 28 13:24:03 2016 -0700

----------------------------------------------------------------------
 .../spark/CentralMomentSPInstruction.java       | 12 +--
 .../spark/CovarianceSPInstruction.java          | 12 +--
 .../spark/data/CorrMatrixBlock.java             |  3 +-
 .../spark/utils/RDDAggregateUtils.java          | 77 +++++++++++---------
 4 files changed, 54 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/07e94fb6/src/main/java/org/apache/sysml/runtime/instructions/spark/CentralMomentSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CentralMomentSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CentralMomentSPInstruction.java
index 4786348..33ba836 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CentralMomentSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CentralMomentSPInstruction.java
@@ -131,14 +131,14 @@ public class CentralMomentSPInstruction extends UnarySPInstruction
 		if( input3 == null ) //w/o weights
 		{
 			cmobj = in1.values().map(new RDDCMFunction(cop))
-			           .reduce(new RDDCMReduceFunction(cop));
+			           .fold(new CM_COV_Object(), new RDDCMReduceFunction(cop));
 		}
 		else //with weights
 		{
 			JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = sec.getBinaryBlockRDDHandleForVariable( input2.getName() );
 			cmobj = in1.join( in2 )
 					   .values().map(new RDDCMWeightsFunction(cop))
-			           .reduce(new RDDCMReduceFunction(cop));
+			           .fold(new CM_COV_Object(), new RDDCMReduceFunction(cop));
 		}
 
 		//create scalar output (no lineage information required)
@@ -209,13 +209,9 @@ public class CentralMomentSPInstruction extends UnarySPInstruction
 		public CM_COV_Object call(CM_COV_Object arg0, CM_COV_Object arg1) 
 			throws Exception 
 		{
-			CM_COV_Object out = new CM_COV_Object();
-			
 			//execute cm combine operations
-			_op.fn.execute(out, arg0);
-			_op.fn.execute(out, arg1);
-			
-			return out;
+			_op.fn.execute(arg0, arg1);		
+			return arg0;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/07e94fb6/src/main/java/org/apache/sysml/runtime/instructions/spark/CovarianceSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CovarianceSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CovarianceSPInstruction.java
index 10e0d56..50c359a 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CovarianceSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CovarianceSPInstruction.java
@@ -111,7 +111,7 @@ public class CovarianceSPInstruction extends BinarySPInstruction
 		{
 			cmobj = in1.join( in2 )
 					   .values().map(new RDDCOVFunction(cop))
-			           .reduce(new RDDCOVReduceFunction(cop));
+			           .fold(new CM_COV_Object(), new RDDCOVReduceFunction(cop));
 		}
 		else //with weights
 		{
@@ -119,7 +119,7 @@ public class CovarianceSPInstruction extends BinarySPInstruction
 			cmobj = in1.join( in2 )
 					   .join( in3 )
 					   .values().map(new RDDCOVWeightsFunction(cop))
-			           .reduce(new RDDCOVReduceFunction(cop));
+			           .fold(new CM_COV_Object(), new RDDCOVReduceFunction(cop));
 		}
 
 		//create scalar output (no lineage information required)
@@ -196,13 +196,9 @@ public class CovarianceSPInstruction extends BinarySPInstruction
 		public CM_COV_Object call(CM_COV_Object arg0, CM_COV_Object arg1) 
 			throws Exception 
 		{
-			CM_COV_Object out = new CM_COV_Object();
-			
 			//execute cov combine operations
-			_op.fn.execute(out, arg0);
-			_op.fn.execute(out, arg1);
-			
-			return out;
+			_op.fn.execute(arg0, arg1);			
+			return arg0;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/07e94fb6/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java
index 055f8a7..335901c 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java
@@ -64,9 +64,10 @@ public class CorrMatrixBlock implements Externalizable
 		return _corr;
 	}
 	
-	public void set(MatrixBlock value, MatrixBlock corr) {
+	public CorrMatrixBlock set(MatrixBlock value, MatrixBlock corr) {
 		_value = value;
 		_corr = corr;
+		return this;
 	}
 	
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/07e94fb6/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
index abfcfbf..ad0c6f9 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
@@ -58,11 +58,14 @@ public class RDDAggregateUtils
 		//stable sum of all blocks with correction block per function instance
 		if( TREE_AGGREGATION ) {
 			return in.values().treeReduce( 
-					new SumSingleBlockFunction() );	
+					new SumSingleBlockFunction(true) );	
 		}
 		else { //DEFAULT
-			return in.values().reduce( 
-					new SumSingleBlockFunction() );
+			//reduce-all aggregate via fold instead of reduce to allow 
+			//for update in-place w/o deep copy of left-hand-side blocks
+			return in.values().fold(
+					new MatrixBlock(), 
+					new SumSingleBlockFunction(false));
 		}
 	}
 	
@@ -141,7 +144,11 @@ public class RDDAggregateUtils
 	public static MatrixBlock aggStable( JavaPairRDD<MatrixIndexes, MatrixBlock> in, AggregateOperator aop )
 	{
 		//stable aggregate of all blocks with correction block per function instance
-		return in.values().reduce( 
+		
+		//reduce-all aggregate via fold instead of reduce to allow 
+		//for update in-place w/o deep copy of left-hand-side blocks
+		return in.values().fold(
+				new MatrixBlock(),
 				new AggregateSingleBlockFunction(aop) );
 	}
 	
@@ -256,8 +263,7 @@ public class RDDAggregateUtils
 			//aggregate other input and maintain corrections 
 			//(existing value and corr are used in place)
 			OperationsOnMatrixValues.incrementalAggregation(value, corr, arg1, _op, false);
-			arg0.set(value, corr);
-			return arg0;
+			return arg0.set(value, corr);
 		}	
 	}
 	
@@ -288,8 +294,7 @@ public class RDDAggregateUtils
 			//aggregate other input and maintain corrections
 			//(existing value and corr are used in place)
 			OperationsOnMatrixValues.incrementalAggregation(value1, corr, value2, _op, false);
-			arg0.set(value1, corr);
-			return arg0;
+			return arg0.set(value1, corr);
 		}	
 	}
 
@@ -528,27 +533,35 @@ public class RDDAggregateUtils
 		
 		private AggregateOperator _op = null;
 		private MatrixBlock _corr = null;
+		private boolean _deep = false;
 		
-		public SumSingleBlockFunction()
-		{
+		public SumSingleBlockFunction(boolean deep) {
 			_op = new AggregateOperator(0, KahanPlus.getKahanPlusFnObject(), true, CorrectionLocationType.NONE);	
-			_corr = null;
+			_deep = deep;
 		}
 		
 		@Override
 		public MatrixBlock call(MatrixBlock arg0, MatrixBlock arg1)
 			throws Exception 
 		{
+			//prepare combiner block
+			if( arg0.getNumRows() <= 0 || arg0.getNumColumns() <= 0 ) {
+				arg0.copy(arg1);
+				return arg0;
+			}
+			else if( arg1.getNumRows() <= 0 || arg1.getNumColumns() <= 0 ) {
+				return arg0;
+			}
+			
 			//create correction block (on demand)
-			if( _corr == null ){
+			if( _corr == null ) {
 				_corr = new MatrixBlock(arg0.getNumRows(), arg0.getNumColumns(), false);
 			}
 			
-			//copy one input to output
-			MatrixBlock out = new MatrixBlock(arg0);
-			
-			//aggregate other input
-			OperationsOnMatrixValues.incrementalAggregation(out, _corr, arg1, _op, false);
+			//aggregate other input (in-place if possible)
+			MatrixBlock out = _deep ? new MatrixBlock(arg0) : arg0;
+			OperationsOnMatrixValues.incrementalAggregation(
+					out, _corr, arg1, _op, false);
 			
 			return out;
 		}
@@ -601,35 +614,33 @@ public class RDDAggregateUtils
 		private AggregateOperator _op = null;
 		private MatrixBlock _corr = null;
 		
-		public AggregateSingleBlockFunction( AggregateOperator op )
-		{
+		public AggregateSingleBlockFunction( AggregateOperator op ) {
 			_op = op;	
-			_corr = null;
 		}
 		
 		@Override
 		public MatrixBlock call(MatrixBlock arg0, MatrixBlock arg1)
 			throws Exception 
 		{
-			//copy one first input
-			MatrixBlock out = new MatrixBlock(arg0); 
+			//prepare combiner block
+			if( arg0.getNumRows() <= 0 || arg0.getNumColumns() <= 0) {
+				arg0.copy(arg1);
+				return arg0;
+			}
+			else if( arg1.getNumRows() <= 0 || arg1.getNumColumns() <= 0 ) {
+				return arg0;
+			}
 			
 			//create correction block (on demand)
-			if( _corr == null ){
+			if( _op.correctionExists && _corr == null ) {
 				_corr = new MatrixBlock(arg0.getNumRows(), arg0.getNumColumns(), false);
 			}
 			
-			//aggregate second input
-			if(_op.correctionExists) {
-				OperationsOnMatrixValues.incrementalAggregation(
-						out, _corr, arg1, _op, true);
-			}
-			else {
-				OperationsOnMatrixValues.incrementalAggregation(
-						out, null, arg1, _op, true);
-			}
+			//aggregate second input (in-place)
+			OperationsOnMatrixValues.incrementalAggregation(
+				arg0, _op.correctionExists ? _corr : null, arg1, _op, true);
 			
-			return out;
+			return arg0;
 		}
 	}