You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/22 00:27:02 UTC

[1/3] incubator-systemml git commit: Minor improvement of parfor eager caching (avoid count if cached)

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 5decbe64b -> 38d087a76


Minor improvement of parfor eager caching (avoid count if cached)

This patch avoids the count (as used for eager caching to avoid
contention of concurrent jobs) if the rdd is already cached.
This improvement primarily applies to very large data that exceeds
aggregated memory, where the counts constitutes an expensive disk-based
scan over all partitions. 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/0750f35a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/0750f35a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/0750f35a

Branch: refs/heads/master
Commit: 0750f35a072694f1540ce91d3117ff478b2caf96
Parents: 5decbe6
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Tue Sep 20 22:55:54 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Tue Sep 20 22:55:54 2016 -0700

----------------------------------------------------------------------
 .../runtime/controlprogram/context/SparkExecutionContext.java   | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0750f35a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 02c900d..f95e0d4 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -1281,8 +1281,9 @@ public class SparkExecutionContext extends ExecutionContext
 		JavaPairRDD<MatrixIndexes,MatrixBlock> in = (JavaPairRDD<MatrixIndexes, MatrixBlock>) 
 				getRDDHandleForMatrixObject(mo, InputInfo.BinaryBlockInputInfo);
 		
-		//persist rdd (force rdd caching)
-		in.count(); //trigger caching to prevent contention			       
+		//persist rdd (force rdd caching, if not already cached)
+		if( !isRDDCached(in.id()) )
+			in.count(); //trigger caching to prevent contention			       
 	}
 	
 	/**


[3/3] incubator-systemml git commit: [SYSTEMML-946] Improved shuffle-based data converters (avoid alloc)

Posted by mb...@apache.org.
[SYSTEMML-946] Improved shuffle-based data converters (avoid alloc)

This patch contains two improvements of our shuffle-based data
converters, which both avoid unnecessary allocations and block copies
and hence reduce GC pressure and generally improve performance. 

(1) Row sparsity estimates on dataframe - matrix conversion, which
affects the grow rate of sparse rows (2x vs 1.1x, and hence reduce
allocations)

(2) Improved mergeByKey primitive: we now use combineByKey instead of
reduceByKey to allocate a combiner block once and merge subsequent
blocks in-place without unnecessary allocation and copy. This applies to
all operations which rely on merging partial blocks. On a scenario of
csv-matrix conversion (1k x 5M, 4 nodes), this improved performance from
1,214s to 443s.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/38d087a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/38d087a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/38d087a7

Branch: refs/heads/master
Commit: 38d087a767a23c5f43a2a1ab45d69bf9bc1b1934
Parents: 80a72d7
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Wed Sep 21 17:26:22 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Wed Sep 21 17:26:22 2016 -0700

----------------------------------------------------------------------
 .../spark/utils/RDDAggregateUtils.java          | 132 ++++++-------------
 .../spark/utils/RDDConverterUtils.java          |   4 +-
 2 files changed, 46 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/38d087a7/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
index c545c30..93bb1d0 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
@@ -100,7 +100,7 @@ public class RDDAggregateUtils
 	{
 		//stable sum of blocks per key, by passing correction blocks along with aggregates 		
 		JavaPairRDD<MatrixIndexes, CorrMatrixBlock> tmp = 
-				in.combineByKey( new CreateBlockCombinerFunction(), 
+				in.combineByKey( new CreateCorrBlockCombinerFunction(), 
 							     new MergeSumBlockValueFunction(), 
 							     new MergeSumBlockCombinerFunction() );
 		
@@ -117,28 +117,6 @@ public class RDDAggregateUtils
 	 * @param in
 	 * @return
 	 */
-//	public static JavaPairRDD<MatrixIndexes, MatrixBlock> sumByKeyStable( MatrixCharacteristics mc, JavaPairRDD<MatrixIndexes, MatrixBlock> in )
-//	{
-//		//stable sum of blocks per key, by passing correction blocks along with aggregates 		
-//		JavaPairRDD<MatrixIndexes, CorrMatrixBlock> tmp = 
-//				in.combineByKey( new CreateBlockCombinerFunction(), 
-//							     new MergeSumBlockValueFunction(), 
-//							     new MergeSumBlockCombinerFunction(),
-//							     new BlockPartitioner(mc, in.partitions().size()));
-//		
-//		//strip-off correction blocks from 					     
-//		JavaPairRDD<MatrixIndexes, MatrixBlock> out =  
-//				tmp.mapValues( new ExtractMatrixBlock() );
-//		
-//		//return the aggregate rdd
-//		return out;
-//	}
-	
-	/**
-	 * 
-	 * @param in
-	 * @return
-	 */
 	public static JavaPairRDD<MatrixIndexes, Double> sumCellsByKeyStable( JavaPairRDD<MatrixIndexes, Double> in )
 	{
 		//stable sum of blocks per key, by passing correction blocks along with aggregates 		
@@ -191,7 +169,7 @@ public class RDDAggregateUtils
 	{
 		//stable sum of blocks per key, by passing correction blocks along with aggregates 		
 		JavaPairRDD<MatrixIndexes, CorrMatrixBlock> tmp = 
-				in.combineByKey( new CreateBlockCombinerFunction(), 
+				in.combineByKey( new CreateCorrBlockCombinerFunction(), 
 							     new MergeAggBlockValueFunction(aop), 
 							     new MergeAggBlockCombinerFunction(aop) );
 		
@@ -204,30 +182,6 @@ public class RDDAggregateUtils
 	}
 	
 	/**
-	 * 
-	 * @param mc
-	 * @param in
-	 * @param aop
-	 * @return
-	 */
-//	public static JavaPairRDD<MatrixIndexes, MatrixBlock> aggByKeyStable( MatrixCharacteristics mc, JavaPairRDD<MatrixIndexes, MatrixBlock> in, AggregateOperator aop )
-//	{
-//		//stable sum of blocks per key, by passing correction blocks along with aggregates 		
-//		JavaPairRDD<MatrixIndexes, CorrMatrixBlock> tmp = 
-//				in.combineByKey( new CreateBlockCombinerFunction(), 
-//							     new MergeAggBlockValueFunction(aop), 
-//							     new MergeAggBlockCombinerFunction(aop),
-//							     new BlockPartitioner(mc, in.partitions().size()));
-//		
-//		//strip-off correction blocks from 					     
-//		JavaPairRDD<MatrixIndexes, MatrixBlock> out =  
-//				tmp.mapValues( new ExtractMatrixBlock() );
-//		
-//		//return the aggregate rdd
-//		return out;
-//	}
-	
-	/**
 	 * Merges disjoint data of all blocks per key.
 	 * 
 	 * Note: The behavior of this method is undefined for both sparse and dense data if the 
@@ -238,24 +192,14 @@ public class RDDAggregateUtils
 	 */
 	public static JavaPairRDD<MatrixIndexes, MatrixBlock> mergeByKey( JavaPairRDD<MatrixIndexes, MatrixBlock> in )
 	{
-		return in.reduceByKey(
-				new MergeBlocksFunction());
+		//use combine by key to avoid unnecessary deep block copies, i.e.
+		//create combiner block once and merge remaining blocks in-place.
+ 		return in.combineByKey( new CreateBlockCombinerFunction(), 
+			    new MergeBlocksFunction(false), 
+			    new MergeBlocksFunction(false) );
 	}
 	
 	/**
-	 * 
-	 * @param mc
-	 * @param in
-	 * @return
-	 */
-//	public static JavaPairRDD<MatrixIndexes, MatrixBlock> mergeByKey( MatrixCharacteristics mc, JavaPairRDD<MatrixIndexes, MatrixBlock> in )
-//	{
-//		return in.reduceByKey(
-//				new BlockPartitioner(mc, in.partitions().size()),
-//				new MergeBlocksFunction());
-//	}
-	
-	/**
 	 * Merges disjoint data of all blocks per key.
 	 * 
 	 * Note: The behavior of this method is undefined for both sparse and dense data if the 
@@ -268,13 +212,13 @@ public class RDDAggregateUtils
 	{
 		return in.combineByKey( new CreateRowBlockCombinerFunction(), 
 							    new MergeRowBlockValueFunction(), 
-							    new MergeRowBlockCombinerFunction() );
+							    new MergeBlocksFunction(false) );
 	}
 	
 	/**
 	 * 
 	 */
-	private static class CreateBlockCombinerFunction implements Function<MatrixBlock, CorrMatrixBlock> 
+	private static class CreateCorrBlockCombinerFunction implements Function<MatrixBlock, CorrMatrixBlock> 
 	{
 		private static final long serialVersionUID = -3666451526776017343L;
 
@@ -349,6 +293,22 @@ public class RDDAggregateUtils
 	/**
 	 *
 	 */
+	private static class CreateBlockCombinerFunction implements Function<MatrixBlock, MatrixBlock> 
+	{
+		private static final long serialVersionUID = 1987501624176848292L;
+
+		@Override
+		public MatrixBlock call(MatrixBlock arg0) 
+			throws Exception 
+		{
+			//create deep copy of given block
+			return new MatrixBlock(arg0);
+		}	
+	}
+	
+	/**
+	 *
+	 */
 	private static class CreateRowBlockCombinerFunction implements Function<RowMatrixBlock, MatrixBlock> 
 	{
 		private static final long serialVersionUID = 2866598914232118425L;
@@ -392,26 +352,6 @@ public class RDDAggregateUtils
 	/**
 	 * 
 	 */
-	private static class MergeRowBlockCombinerFunction implements Function2<MatrixBlock, MatrixBlock, MatrixBlock> 
-	{
-		private static final long serialVersionUID = 5142967296705548000L;
-
-		@Override
-		public MatrixBlock call(MatrixBlock arg0, MatrixBlock arg1) 
-			throws Exception 
-		{
-			//merge second matrix block into first
-			MatrixBlock out = arg0; //in-place update
-			out.merge(arg1, false);
-			out.examSparsity();
-			
-			return out;
-		}	
-	}
-	
-	/**
-	 * 
-	 */
 	private static class CreateCellCombinerFunction implements Function<Double, KahanObject> 
 	{
 		private static final long serialVersionUID = 3697505233057172994L;
@@ -736,11 +676,25 @@ public class RDDAggregateUtils
 	private static class MergeBlocksFunction implements Function2<MatrixBlock, MatrixBlock, MatrixBlock> 
 	{		
 		private static final long serialVersionUID = -8881019027250258850L;
-
+		private boolean _deep = false;
+		
+		@SuppressWarnings("unused")
+		public MergeBlocksFunction() {
+			//by default deep copy first argument
+			this(true); 
+		}
+		
+		public MergeBlocksFunction(boolean deep) {
+			_deep = deep;
+		}
+		
 		@Override
 		public MatrixBlock call(MatrixBlock b1, MatrixBlock b2) 
 			throws Exception 
 		{
+			long b1nnz = b1.getNonZeros();
+			long b2nnz = b2.getNonZeros();
+			
 			// sanity check input dimensions
 			if (b1.getNumRows() != b2.getNumRows() || b1.getNumColumns() != b2.getNumColumns()) {
 				throw new DMLRuntimeException("Mismatched block sizes for: "
@@ -749,14 +703,14 @@ public class RDDAggregateUtils
 			}
 
 			// execute merge (never pass by reference)
-			MatrixBlock ret = new MatrixBlock(b1);
+			MatrixBlock ret = _deep ? new MatrixBlock(b1) : b1;
 			ret.merge(b2, false);
 			ret.examSparsity();
 			
 			// sanity check output number of non-zeros
-			if (ret.getNonZeros() != b1.getNonZeros() + b2.getNonZeros()) {
+			if (ret.getNonZeros() != b1nnz + b2nnz) {
 				throw new DMLRuntimeException("Number of non-zeros does not match: "
-						+ ret.getNonZeros() + " != " + b1.getNonZeros() + " + " + b2.getNonZeros());
+						+ ret.getNonZeros() + " != " + b1nnz + " + " + b2nnz);
 			}
 
 			return ret;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/38d087a7/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index a619a4d..38ebd7e 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -899,6 +899,7 @@ public class RDDConverterUtils
 		private long _clen = -1;
 		private int _brlen = -1;
 		private int _bclen = -1;
+		private double _sparsity = 1.0;
 		private boolean _sparse = false;
 		private boolean _containsID;
 		private boolean _isVector;
@@ -908,6 +909,7 @@ public class RDDConverterUtils
 			_clen = mc.getCols();
 			_brlen = mc.getRowsPerBlock();
 			_bclen = mc.getColsPerBlock();
+			_sparsity = OptimizerUtils.getSparsity(mc);
 			_sparse = sparse;
 			_containsID = containsID;
 			_isVector = isVector;
@@ -976,7 +978,7 @@ public class RDDConverterUtils
 			for( int cix=1; cix<=ncblks; cix++ ) {
 				int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);				
 				ix[cix-1] = new MatrixIndexes(rix, cix);
-				mb[cix-1] = new MatrixBlock(lrlen, lclen, _sparse);		
+				mb[cix-1] = new MatrixBlock(lrlen, lclen, _sparse,(int)(lrlen*lclen*_sparsity));
 			}
 		}
 		


[2/3] incubator-systemml git commit: [SYSTEMML-927] Fix cp write frame (schema handling on export)

Posted by mb...@apache.org.
[SYSTEMML-927] Fix cp write frame (schema handling on export)

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/80a72d7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/80a72d7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/80a72d7c

Branch: refs/heads/master
Commit: 80a72d7c01888141aacd5172c53ae3bd654d30dc
Parents: 0750f35
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Wed Sep 21 15:18:36 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Wed Sep 21 15:18:36 2016 -0700

----------------------------------------------------------------------
 .../runtime/controlprogram/caching/CacheableData.java     | 10 +++++++++-
 .../sysml/runtime/controlprogram/caching/FrameObject.java |  1 +
 2 files changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80a72d7c/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
index 9a136cd..3e2f6c1 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
@@ -22,6 +22,7 @@ package org.apache.sysml.runtime.controlprogram.caching;
 import java.io.File;
 import java.io.IOException;
 import java.lang.ref.SoftReference;
+import java.util.List;
 
 import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.logging.Log;
@@ -972,6 +973,10 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 		return (_data.getInMemorySize() <= CACHING_THRESHOLD);
 	}
 	
+	protected List<ValueType> getSchema() {
+		return null;
+	}
+	
 	/**
 	 * 
 	 */
@@ -1074,7 +1079,10 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 			{
 				mc = new MatrixCharacteristics(mc.getRows(), mc.getCols(), ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize(), mc.getNonZeros());
 			}
-			MapReduceTool.writeMetaDataFile (filePathAndName + ".mtd", valueType, null, dataType, mc, oinfo, formatProperties);
+			
+			//write the actual meta data file
+			MapReduceTool.writeMetaDataFile (filePathAndName + ".mtd", valueType, 
+					getSchema(), dataType, mc, oinfo, formatProperties);
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80a72d7c/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
index bfccdf1..1209064 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
@@ -87,6 +87,7 @@ public class FrameObject extends CacheableData<FrameBlock>
 		super(fo);
 	}
 	
+	@Override
 	public List<ValueType> getSchema() {
 		return _schema;
 	}