You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/04/27 07:02:38 UTC

[4/6] systemml git commit: [SYSTEMML-2282] Memory efficiency spark empty block injection

[SYSTEMML-2282] Memory efficiency spark empty block injection 

This patch improves the memory efficiency of spark empty block injection
to reduce GC overheads. For the scenario of creating partitions with all
empty blocks of a matrix (for union with non-zero block reblock), the
number of blocks is often very large (>3M per partition). Hence, we now
use more conservative partition sizes of 32 instead of 128MB as well as
lazy iterators when creating the blocks for a single offset (i.e., for a
single partition).


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/7cb43ddd
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/7cb43ddd
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/7cb43ddd

Branch: refs/heads/master
Commit: 7cb43dddda45e5e6ceae5371b9bc15b28e72ac63
Parents: 3b359c3
Author: Matthias Boehm <mb...@gmail.com>
Authored: Thu Apr 26 21:05:31 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Apr 27 00:03:18 2018 -0700

----------------------------------------------------------------------
 .../instructions/spark/utils/SparkUtils.java     | 19 ++++++++-----------
 1 file changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/7cb43ddd/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
index 49232da..952135e 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
@@ -20,7 +20,6 @@
 
 package org.apache.sysml.runtime.instructions.spark.utils;
 
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -188,7 +187,7 @@ public class SparkUtils
 		//compute degree of parallelism and block ranges
 		long size = mc.getNumBlocks() * OptimizerUtils.estimateSizeEmptyBlock(Math.min(
 				Math.max(mc.getRows(),1), mc.getRowsPerBlock()), Math.min(Math.max(mc.getCols(),1), mc.getColsPerBlock()));
-		int par = (int) Math.min(Math.max(SparkExecutionContext.getDefaultParallelism(true),
+		int par = (int) Math.min(4*Math.max(SparkExecutionContext.getDefaultParallelism(true),
 				Math.ceil(size/InfrastructureAnalyzer.getHDFSBlockSize())), mc.getNumBlocks());
 		long pNumBlocks = (long)Math.ceil((double)mc.getNumBlocks()/par);
 		
@@ -273,21 +272,19 @@ public class SparkUtils
 		}
 		
 		@Override
-		public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Long arg0) 
-			throws Exception 
-		{
-			ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> list = new ArrayList<>();
+		public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Long arg0) throws Exception {
+			//NOTE: for cases of a very large number of empty blocks per partition
+			//(e.g., >3M for 128MB partitions), it is important for low GC overhead 
+			//not to materialized these objects but return a lazy iterator instead.
 			long ncblks = _mc.getNumColBlocks();
 			long nblocksU = Math.min(arg0+_pNumBlocks, _mc.getNumBlocks());
-			for( long i=arg0; i<nblocksU; i++ ) {
+			return LongStream.range(arg0, nblocksU).mapToObj(i -> {
 				long rix = 1 + i / ncblks;
 				long cix = 1 + i % ncblks;
 				int lrlen = UtilFunctions.computeBlockSize(_mc.getRows(), rix, _mc.getRowsPerBlock());
 				int lclen = UtilFunctions.computeBlockSize(_mc.getCols(), cix, _mc.getColsPerBlock());
-				list.add(new Tuple2<>(new MatrixIndexes(rix,cix), 
-					new MatrixBlock(lrlen, lclen, true)));
-			}
-			return list.iterator();
+				return new Tuple2<>(new MatrixIndexes(rix,cix), new MatrixBlock(lrlen, lclen, true));
+			}).iterator();
 		}
 	}
 }