You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/27 18:03:36 UTC
[3/3] incubator-systemml git commit: [SYSTEMML-831] Robustness spark
mapmm (automatic input repartitioning)
[SYSTEMML-831] Robustness spark mapmm (automatic input repartitioning)
This patch adds a robustness features to our spark mapmm instruction,
which automatically repartitions the input rdd for outer-product like
matrix multiplication in order to (1) increase the degree of parallelism
and (2) reduce the size of output partitions (to avoid 2GB limitations
of Spark). See https://issues.apache.org/jira/browse/SYSTEMML-831 for
details.
Furthermore, this also includes a minor fix for frame functions tests
(see SYSTEMML-960) with regard to single jvm tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/055e850f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/055e850f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/055e850f
Branch: refs/heads/master
Commit: 055e850fcc8475e0690743c03708effcdc8f7ed0
Parents: 2f7a67d
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Mon Sep 26 22:56:23 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Tue Sep 27 10:59:46 2016 -0700
----------------------------------------------------------------------
.../instructions/spark/MapmmSPInstruction.java | 53 +++++++++++++++++++-
.../functions/frame/FrameFunctionTest.java | 3 ++
2 files changed, 55 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/055e850f/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
index 5b3e9ad..9b5f3de 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
@@ -30,11 +30,13 @@ import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import org.apache.sysml.hops.AggBinaryOp.SparkAggType;
+import org.apache.sysml.hops.OptimizerUtils;
import org.apache.sysml.lops.MapMult;
import org.apache.sysml.lops.MapMult.CacheType;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysml.runtime.functionobjects.Multiply;
import org.apache.sysml.runtime.functionobjects.Plus;
import org.apache.sysml.runtime.instructions.InstructionUtils;
@@ -125,8 +127,11 @@ public class MapmmSPInstruction extends BinarySPInstruction
//execute mapmult instruction
JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
- if( requiresFlatMapFunction(_type, mcBc) )
+ if( requiresFlatMapFunction(_type, mcBc) ) {
+ if( requiresRepartitioning(_type, mcRdd, mcBc, in1.partitions().size()) )
+ in1 = in1.repartition(getNumRepartitioning(_type, mcRdd, mcBc, in1.partitions().size()));
out = in1.flatMapToPair( new RDDFlatMapMMFunction(_type, in2) );
+ }
else if( preservesPartitioning(mcRdd, _type) )
out = in1.mapPartitionsToPair(new RDDMapMMPartitionFunction(_type, in2), true);
else
@@ -175,6 +180,8 @@ public class MapmmSPInstruction extends BinarySPInstruction
}
/**
+ * Indicates if there is a need to apply a flatmap rdd operation because a single
+ * input block creates multiple output blocks.
*
* @param type
* @param mcBc
@@ -187,7 +194,51 @@ public class MapmmSPInstruction extends BinarySPInstruction
}
/**
+ * Indicates if there is a need to repartition the input RDD in order to increase the
+ * degree of parallelism or reduce the output partition size (e.g., Spark still has a
+ * 2GB limitation of partitions)
*
+ * @param type
+ * @param mcRdd
+ * @param mcBc
+ * @param numPartitions
+ * @return
+ */
+ private static boolean requiresRepartitioning( CacheType type, MatrixCharacteristics mcRdd, MatrixCharacteristics mcBc, int numPartitions ) {
+ //note: as repartitioning requires data shuffling, we try to be very conservative here
+ //approach: we repartition, if there is a "outer-product-like" mm (single block common dimension),
+ //the size of output partitions (assuming dense) exceeds a size of 1GB
+
+ boolean isLeft = (type == CacheType.LEFT);
+ boolean isOuter = isLeft ?
+ (mcRdd.getRows() <= mcRdd.getRowsPerBlock()) :
+ (mcRdd.getCols() <= mcRdd.getColsPerBlock());
+ boolean isLargeOutput = (OptimizerUtils.estimatePartitionedSizeExactSparsity(isLeft?mcBc.getRows():mcRdd.getRows(),
+ isLeft?mcRdd.getCols():mcBc.getCols(), isLeft?mcBc.getRowsPerBlock():mcRdd.getRowsPerBlock(),
+ isLeft?mcRdd.getColsPerBlock():mcBc.getColsPerBlock(), 1.0) / numPartitions) > 1024*1024*1024;
+ return isOuter && isLargeOutput && mcRdd.dimsKnown() && mcBc.dimsKnown();
+ }
+
+ /**
+ * Computes the number of target partitions for repartitioning input rdds in case of
+ * outer-product-like mm.
+ *
+ * @param type
+ * @param mcRdd
+ * @param mcBc
+ * @param numPartitions
+ * @return
+ */
+ private static int getNumRepartitioning( CacheType type, MatrixCharacteristics mcRdd, MatrixCharacteristics mcBc, int numPartitions ) {
+ boolean isLeft = (type == CacheType.LEFT);
+ long sizeOutput = (OptimizerUtils.estimatePartitionedSizeExactSparsity(isLeft?mcBc.getRows():mcRdd.getRows(),
+ isLeft?mcRdd.getCols():mcBc.getCols(), isLeft?mcBc.getRowsPerBlock():mcRdd.getRowsPerBlock(),
+ isLeft?mcRdd.getColsPerBlock():mcBc.getColsPerBlock(), 1.0));
+ long numParts = sizeOutput / InfrastructureAnalyzer.getHDFSBlockSize();
+ return (int)Math.min(numParts, (isLeft?mcRdd.getNumColBlocks():mcRdd.getNumRowBlocks()));
+ }
+
+ /**
*
*/
private static class RDDMapMMFunction implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock>
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/055e850f/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
index b506444..af2e75f 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
@@ -88,7 +88,9 @@ public class FrameFunctionTest extends AutomatedTestBase
DMLScript.USE_LOCAL_SPARK_CONFIG = true;
boolean oldIPA = OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS;
+ boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK;
OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS = IPA;
+ OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true;
try
{
@@ -124,6 +126,7 @@ public class FrameFunctionTest extends AutomatedTestBase
rtplatform = platformOld;
DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS = oldIPA;
+ OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld;
}
}
}