You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/05/09 00:02:51 UTC

incubator-systemml git commit: [SYSTEMML-1593] Performance spark rexpand op (load balance, no shuffle)

Repository: incubator-systemml
Updated Branches:
  refs/heads/master ccdb61fd6 -> b8de68b74


[SYSTEMML-1593] Performance spark rexpand op (load balance, no shuffle)

This patch fixes performance issues of spark rexpand operations, which
showed poor performance for ultra-sparse outputs. The reasons where (1)
load imbalance for a small input vector that creates a huge ultra-sparse
matrix, and (2) unnecessary shuffle of the large output matrix. Consider
the following scenario, where v is a 20M x 1 vector:

FK = table(seq(1,nrow(v)), v, nrow(v), 1e6)
print("Sum of FK = " + sum(FK))

The input is just 160MB and hence read as two partitions from HDFS.
However, the output is large due to its blocked representation, leading
to load imbalance. This patch improved performance of the above scenario
on a 1+6 node cluster from 794s to 49s (incl. 30s for spark context
creation).


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b8de68b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b8de68b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b8de68b7

Branch: refs/heads/master
Commit: b8de68b74d2002fe4a232a7e77a28bfd121ca4eb
Parents: ccdb61f
Author: Matthias Boehm <mb...@gmail.com>
Authored: Mon May 8 17:01:24 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Mon May 8 17:03:50 2017 -0700

----------------------------------------------------------------------
 .../spark/ParameterizedBuiltinSPInstruction.java        | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b8de68b7/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
index 5c27f60..2a80cfc 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
@@ -411,10 +411,18 @@ public class ParameterizedBuiltinSPInstruction  extends ComputationSPInstruction
 			long brlen = mcIn.getRowsPerBlock();
 			long bclen = mcIn.getColsPerBlock();
 			
-			//execute remove empty rows/cols operation
+			//repartition input vector for higher degree of parallelism 
+			//(avoid scenarios where few input partitions create huge outputs)
+			MatrixCharacteristics mcTmp = new MatrixCharacteristics(dirRows?lmaxVal:mcIn.getRows(), 
+					dirRows?mcIn.getRows():lmaxVal, (int)brlen, (int)bclen, mcIn.getRows());
+			int numParts = (int)Math.min(SparkUtils.getNumPreferredPartitions(mcTmp, in), mcIn.getNumBlocks());
+			if( numParts > in.getNumPartitions()*2 )
+				in = in.repartition(numParts);
+			
+			//execute rexpand rows/cols operation (no shuffle required because outputs are
+			//block-aligned with the input, i.e., one input block generates n output blocks)
 			JavaPairRDD<MatrixIndexes,MatrixBlock> out = in
 					.flatMapToPair(new RDDRExpandFunction(maxVal, dirRows, cast, ignore, brlen, bclen));		
-			out = RDDAggregateUtils.mergeByKey(out, false);
 			
 			//store output rdd handle
 			sec.setRDDHandleForVariable(output.getName(), out);