You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/27 18:03:36 UTC

[3/3] incubator-systemml git commit: [SYSTEMML-831] Robustness spark mapmm (automatic input repartitioning)

[SYSTEMML-831] Robustness spark mapmm (automatic input repartitioning) 

This patch adds a robustness features to our spark mapmm instruction,
which automatically repartitions the input rdd for outer-product like
matrix multiplication in order to (1) increase the degree of parallelism
and (2) reduce the size of output partitions (to avoid 2GB limitations
of Spark). See https://issues.apache.org/jira/browse/SYSTEMML-831 for
details.

Furthermore, this also includes a minor fix for frame functions tests
(see SYSTEMML-960) with regard to single jvm tests. 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/055e850f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/055e850f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/055e850f

Branch: refs/heads/master
Commit: 055e850fcc8475e0690743c03708effcdc8f7ed0
Parents: 2f7a67d
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Mon Sep 26 22:56:23 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Tue Sep 27 10:59:46 2016 -0700

----------------------------------------------------------------------
 .../instructions/spark/MapmmSPInstruction.java  | 53 +++++++++++++++++++-
 .../functions/frame/FrameFunctionTest.java      |  3 ++
 2 files changed, 55 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/055e850f/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
index 5b3e9ad..9b5f3de 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
@@ -30,11 +30,13 @@ import org.apache.spark.api.java.function.PairFunction;
 import scala.Tuple2;
 
 import org.apache.sysml.hops.AggBinaryOp.SparkAggType;
+import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.lops.MapMult;
 import org.apache.sysml.lops.MapMult.CacheType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.functionobjects.Multiply;
 import org.apache.sysml.runtime.functionobjects.Plus;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
@@ -125,8 +127,11 @@ public class MapmmSPInstruction extends BinarySPInstruction
 		
 		//execute mapmult instruction
 		JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
-		if( requiresFlatMapFunction(_type, mcBc) ) 
+		if( requiresFlatMapFunction(_type, mcBc) ) {
+			if( requiresRepartitioning(_type, mcRdd, mcBc, in1.partitions().size()) )
+				in1 = in1.repartition(getNumRepartitioning(_type, mcRdd, mcBc, in1.partitions().size()));
 			out = in1.flatMapToPair( new RDDFlatMapMMFunction(_type, in2) );
+		}
 		else if( preservesPartitioning(mcRdd, _type) )
 			out = in1.mapPartitionsToPair(new RDDMapMMPartitionFunction(_type, in2), true);
 		else
@@ -175,6 +180,8 @@ public class MapmmSPInstruction extends BinarySPInstruction
 	}
 	
 	/**
+	 * Indicates if there is a need to apply a flatmap rdd operation because a single 
+	 * input block creates multiple output blocks.
 	 * 
 	 * @param type
 	 * @param mcBc
@@ -187,7 +194,51 @@ public class MapmmSPInstruction extends BinarySPInstruction
 	}
 	
 	/**
+	 * Indicates if there is a need to repartition the input RDD in order to increase the
+	 * degree of parallelism or reduce the output partition size (e.g., Spark still has a
+	 * 2GB limitation of partitions)
 	 * 
+	 * @param type
+	 * @param mcRdd
+	 * @param mcBc
+	 * @param numPartitions
+	 * @return
+	 */
+	private static boolean requiresRepartitioning( CacheType type, MatrixCharacteristics mcRdd, MatrixCharacteristics mcBc, int numPartitions ) {
+		//note: as repartitioning requires data shuffling, we try to be very conservative here
+		//approach: we repartition, if there is a "outer-product-like" mm (single block common dimension),
+		//the size of output partitions (assuming dense) exceeds a size of 1GB 
+		
+		boolean isLeft = (type == CacheType.LEFT);
+		boolean isOuter = isLeft ? 
+				(mcRdd.getRows() <= mcRdd.getRowsPerBlock()) :
+				(mcRdd.getCols() <= mcRdd.getColsPerBlock());
+		boolean isLargeOutput = (OptimizerUtils.estimatePartitionedSizeExactSparsity(isLeft?mcBc.getRows():mcRdd.getRows(),
+				isLeft?mcRdd.getCols():mcBc.getCols(), isLeft?mcBc.getRowsPerBlock():mcRdd.getRowsPerBlock(),
+				isLeft?mcRdd.getColsPerBlock():mcBc.getColsPerBlock(), 1.0) / numPartitions) > 1024*1024*1024; 
+		return isOuter && isLargeOutput && mcRdd.dimsKnown() && mcBc.dimsKnown();
+	}
+
+	/**
+	 * Computes the number of target partitions for repartitioning input rdds in case of 
+	 * outer-product-like mm. 
+	 * 
+	 * @param type
+	 * @param mcRdd
+	 * @param mcBc
+	 * @param numPartitions
+	 * @return
+	 */
+	private static int getNumRepartitioning( CacheType type, MatrixCharacteristics mcRdd, MatrixCharacteristics mcBc, int numPartitions ) {
+		boolean isLeft = (type == CacheType.LEFT);
+		long sizeOutput = (OptimizerUtils.estimatePartitionedSizeExactSparsity(isLeft?mcBc.getRows():mcRdd.getRows(),
+				isLeft?mcRdd.getCols():mcBc.getCols(), isLeft?mcBc.getRowsPerBlock():mcRdd.getRowsPerBlock(),
+				isLeft?mcRdd.getColsPerBlock():mcBc.getColsPerBlock(), 1.0)); 
+		long numParts = sizeOutput / InfrastructureAnalyzer.getHDFSBlockSize();
+		return (int)Math.min(numParts, (isLeft?mcRdd.getNumColBlocks():mcRdd.getNumRowBlocks()));
+	}
+	
+	/**
 	 * 
 	 */
 	private static class RDDMapMMFunction implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock> 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/055e850f/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
index b506444..af2e75f 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
@@ -88,7 +88,9 @@ public class FrameFunctionTest extends AutomatedTestBase
 			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
 	
 		boolean oldIPA = OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS;
+		boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK;
 		OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS = IPA;
+		OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true;
 		
 		try
 		{
@@ -124,6 +126,7 @@ public class FrameFunctionTest extends AutomatedTestBase
 			rtplatform = platformOld;
 			DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
 			OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS = oldIPA;
+			OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld;
 		}
 	}
 }