You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/22 00:27:04 UTC

[3/3] incubator-systemml git commit: [SYSTEMML-946] Improved shuffle-based data converters (avoid alloc)

[SYSTEMML-946] Improved shuffle-based data converters (avoid alloc)

This patch contains two improvements of our shuffle-based data
converters, which both avoid unnecessary allocations and block copies
and hence reduce GC pressure and generally improve performance. 

(1) Row sparsity estimates on dataframe - matrix conversion, which
affects the grow rate of sparse rows (2x vs 1.1x, and hence reduce
allocations)

(2) Improved mergeByKey primitive: we now use combineByKey instead of
reduceByKey to allocate a combiner block once and merge subsequent
blocks in-place without unnecessary allocation and copy. This applies to
all operations which rely on merging partial blocks. On a scenario of
csv-matrix conversion (1k x 5M, 4 nodes), this improved performance from
1,214s to 443s.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/38d087a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/38d087a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/38d087a7

Branch: refs/heads/master
Commit: 38d087a767a23c5f43a2a1ab45d69bf9bc1b1934
Parents: 80a72d7
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Wed Sep 21 17:26:22 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Wed Sep 21 17:26:22 2016 -0700

----------------------------------------------------------------------
 .../spark/utils/RDDAggregateUtils.java          | 132 ++++++-------------
 .../spark/utils/RDDConverterUtils.java          |   4 +-
 2 files changed, 46 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/38d087a7/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
index c545c30..93bb1d0 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
@@ -100,7 +100,7 @@ public class RDDAggregateUtils
 	{
 		//stable sum of blocks per key, by passing correction blocks along with aggregates 		
 		JavaPairRDD<MatrixIndexes, CorrMatrixBlock> tmp = 
-				in.combineByKey( new CreateBlockCombinerFunction(), 
+				in.combineByKey( new CreateCorrBlockCombinerFunction(), 
 							     new MergeSumBlockValueFunction(), 
 							     new MergeSumBlockCombinerFunction() );
 		
@@ -117,28 +117,6 @@ public class RDDAggregateUtils
 	 * @param in
 	 * @return
 	 */
-//	public static JavaPairRDD<MatrixIndexes, MatrixBlock> sumByKeyStable( MatrixCharacteristics mc, JavaPairRDD<MatrixIndexes, MatrixBlock> in )
-//	{
-//		//stable sum of blocks per key, by passing correction blocks along with aggregates 		
-//		JavaPairRDD<MatrixIndexes, CorrMatrixBlock> tmp = 
-//				in.combineByKey( new CreateBlockCombinerFunction(), 
-//							     new MergeSumBlockValueFunction(), 
-//							     new MergeSumBlockCombinerFunction(),
-//							     new BlockPartitioner(mc, in.partitions().size()));
-//		
-//		//strip-off correction blocks from 					     
-//		JavaPairRDD<MatrixIndexes, MatrixBlock> out =  
-//				tmp.mapValues( new ExtractMatrixBlock() );
-//		
-//		//return the aggregate rdd
-//		return out;
-//	}
-	
-	/**
-	 * 
-	 * @param in
-	 * @return
-	 */
 	public static JavaPairRDD<MatrixIndexes, Double> sumCellsByKeyStable( JavaPairRDD<MatrixIndexes, Double> in )
 	{
 		//stable sum of blocks per key, by passing correction blocks along with aggregates 		
@@ -191,7 +169,7 @@ public class RDDAggregateUtils
 	{
 		//stable sum of blocks per key, by passing correction blocks along with aggregates 		
 		JavaPairRDD<MatrixIndexes, CorrMatrixBlock> tmp = 
-				in.combineByKey( new CreateBlockCombinerFunction(), 
+				in.combineByKey( new CreateCorrBlockCombinerFunction(), 
 							     new MergeAggBlockValueFunction(aop), 
 							     new MergeAggBlockCombinerFunction(aop) );
 		
@@ -204,30 +182,6 @@ public class RDDAggregateUtils
 	}
 	
 	/**
-	 * 
-	 * @param mc
-	 * @param in
-	 * @param aop
-	 * @return
-	 */
-//	public static JavaPairRDD<MatrixIndexes, MatrixBlock> aggByKeyStable( MatrixCharacteristics mc, JavaPairRDD<MatrixIndexes, MatrixBlock> in, AggregateOperator aop )
-//	{
-//		//stable sum of blocks per key, by passing correction blocks along with aggregates 		
-//		JavaPairRDD<MatrixIndexes, CorrMatrixBlock> tmp = 
-//				in.combineByKey( new CreateBlockCombinerFunction(), 
-//							     new MergeAggBlockValueFunction(aop), 
-//							     new MergeAggBlockCombinerFunction(aop),
-//							     new BlockPartitioner(mc, in.partitions().size()));
-//		
-//		//strip-off correction blocks from 					     
-//		JavaPairRDD<MatrixIndexes, MatrixBlock> out =  
-//				tmp.mapValues( new ExtractMatrixBlock() );
-//		
-//		//return the aggregate rdd
-//		return out;
-//	}
-	
-	/**
 	 * Merges disjoint data of all blocks per key.
 	 * 
 	 * Note: The behavior of this method is undefined for both sparse and dense data if the 
@@ -238,24 +192,14 @@ public class RDDAggregateUtils
 	 */
 	public static JavaPairRDD<MatrixIndexes, MatrixBlock> mergeByKey( JavaPairRDD<MatrixIndexes, MatrixBlock> in )
 	{
-		return in.reduceByKey(
-				new MergeBlocksFunction());
+		//use combine by key to avoid unnecessary deep block copies, i.e.
+		//create combiner block once and merge remaining blocks in-place.
+ 		return in.combineByKey( new CreateBlockCombinerFunction(), 
+			    new MergeBlocksFunction(false), 
+			    new MergeBlocksFunction(false) );
 	}
 	
 	/**
-	 * 
-	 * @param mc
-	 * @param in
-	 * @return
-	 */
-//	public static JavaPairRDD<MatrixIndexes, MatrixBlock> mergeByKey( MatrixCharacteristics mc, JavaPairRDD<MatrixIndexes, MatrixBlock> in )
-//	{
-//		return in.reduceByKey(
-//				new BlockPartitioner(mc, in.partitions().size()),
-//				new MergeBlocksFunction());
-//	}
-	
-	/**
 	 * Merges disjoint data of all blocks per key.
 	 * 
 	 * Note: The behavior of this method is undefined for both sparse and dense data if the 
@@ -268,13 +212,13 @@ public class RDDAggregateUtils
 	{
 		return in.combineByKey( new CreateRowBlockCombinerFunction(), 
 							    new MergeRowBlockValueFunction(), 
-							    new MergeRowBlockCombinerFunction() );
+							    new MergeBlocksFunction(false) );
 	}
 	
 	/**
 	 * 
 	 */
-	private static class CreateBlockCombinerFunction implements Function<MatrixBlock, CorrMatrixBlock> 
+	private static class CreateCorrBlockCombinerFunction implements Function<MatrixBlock, CorrMatrixBlock> 
 	{
 		private static final long serialVersionUID = -3666451526776017343L;
 
@@ -349,6 +293,22 @@ public class RDDAggregateUtils
 	/**
 	 *
 	 */
+	private static class CreateBlockCombinerFunction implements Function<MatrixBlock, MatrixBlock> 
+	{
+		private static final long serialVersionUID = 1987501624176848292L;
+
+		@Override
+		public MatrixBlock call(MatrixBlock arg0) 
+			throws Exception 
+		{
+			//create deep copy of given block
+			return new MatrixBlock(arg0);
+		}	
+	}
+	
+	/**
+	 *
+	 */
 	private static class CreateRowBlockCombinerFunction implements Function<RowMatrixBlock, MatrixBlock> 
 	{
 		private static final long serialVersionUID = 2866598914232118425L;
@@ -392,26 +352,6 @@ public class RDDAggregateUtils
 	/**
 	 * 
 	 */
-	private static class MergeRowBlockCombinerFunction implements Function2<MatrixBlock, MatrixBlock, MatrixBlock> 
-	{
-		private static final long serialVersionUID = 5142967296705548000L;
-
-		@Override
-		public MatrixBlock call(MatrixBlock arg0, MatrixBlock arg1) 
-			throws Exception 
-		{
-			//merge second matrix block into first
-			MatrixBlock out = arg0; //in-place update
-			out.merge(arg1, false);
-			out.examSparsity();
-			
-			return out;
-		}	
-	}
-	
-	/**
-	 * 
-	 */
 	private static class CreateCellCombinerFunction implements Function<Double, KahanObject> 
 	{
 		private static final long serialVersionUID = 3697505233057172994L;
@@ -736,11 +676,25 @@ public class RDDAggregateUtils
 	private static class MergeBlocksFunction implements Function2<MatrixBlock, MatrixBlock, MatrixBlock> 
 	{		
 		private static final long serialVersionUID = -8881019027250258850L;
-
+		private boolean _deep = false;
+		
+		@SuppressWarnings("unused")
+		public MergeBlocksFunction() {
+			//by default deep copy first argument
+			this(true); 
+		}
+		
+		public MergeBlocksFunction(boolean deep) {
+			_deep = deep;
+		}
+		
 		@Override
 		public MatrixBlock call(MatrixBlock b1, MatrixBlock b2) 
 			throws Exception 
 		{
+			long b1nnz = b1.getNonZeros();
+			long b2nnz = b2.getNonZeros();
+			
 			// sanity check input dimensions
 			if (b1.getNumRows() != b2.getNumRows() || b1.getNumColumns() != b2.getNumColumns()) {
 				throw new DMLRuntimeException("Mismatched block sizes for: "
@@ -749,14 +703,14 @@ public class RDDAggregateUtils
 			}
 
 			// execute merge (never pass by reference)
-			MatrixBlock ret = new MatrixBlock(b1);
+			MatrixBlock ret = _deep ? new MatrixBlock(b1) : b1;
 			ret.merge(b2, false);
 			ret.examSparsity();
 			
 			// sanity check output number of non-zeros
-			if (ret.getNonZeros() != b1.getNonZeros() + b2.getNonZeros()) {
+			if (ret.getNonZeros() != b1nnz + b2nnz) {
 				throw new DMLRuntimeException("Number of non-zeros does not match: "
-						+ ret.getNonZeros() + " != " + b1.getNonZeros() + " + " + b2.getNonZeros());
+						+ ret.getNonZeros() + " != " + b1nnz + " + " + b2nnz);
 			}
 
 			return ret;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/38d087a7/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index a619a4d..38ebd7e 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -899,6 +899,7 @@ public class RDDConverterUtils
 		private long _clen = -1;
 		private int _brlen = -1;
 		private int _bclen = -1;
+		private double _sparsity = 1.0;
 		private boolean _sparse = false;
 		private boolean _containsID;
 		private boolean _isVector;
@@ -908,6 +909,7 @@ public class RDDConverterUtils
 			_clen = mc.getCols();
 			_brlen = mc.getRowsPerBlock();
 			_bclen = mc.getColsPerBlock();
+			_sparsity = OptimizerUtils.getSparsity(mc);
 			_sparse = sparse;
 			_containsID = containsID;
 			_isVector = isVector;
@@ -976,7 +978,7 @@ public class RDDConverterUtils
 			for( int cix=1; cix<=ncblks; cix++ ) {
 				int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);				
 				ix[cix-1] = new MatrixIndexes(rix, cix);
-				mb[cix-1] = new MatrixBlock(lrlen, lclen, _sparse);		
+				mb[cix-1] = new MatrixBlock(lrlen, lclen, _sparse,(int)(lrlen*lclen*_sparsity));
 			}
 		}