You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2015/11/28 22:23:44 UTC
incubator-systemml git commit: Improved rdd checkpoint injection
(avoid unnecessary input caching)
Repository: incubator-systemml
Updated Branches:
refs/heads/master f40974ee8 -> 649dfbfdf
Improved rdd checkpoint injection (avoid unnecessary input caching)
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/649dfbfd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/649dfbfd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/649dfbfd
Branch: refs/heads/master
Commit: 649dfbfdff2a58b6028360a9118ab19c50491e84
Parents: f40974e
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Fri Nov 27 20:57:12 2015 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Fri Nov 27 20:57:12 2015 -0800
----------------------------------------------------------------------
.../context/SparkExecutionContext.java | 46 ++++++++++++++++++++
.../spark/CheckpointSPInstruction.java | 28 ++++++++----
2 files changed, 66 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/649dfbfd/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java
index 5d05950..48b4699 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -29,6 +29,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.RDDInfo;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
@@ -43,6 +44,7 @@ import com.ibm.bi.dml.runtime.DMLUnsupportedOperationException;
import com.ibm.bi.dml.runtime.controlprogram.Program;
import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
import com.ibm.bi.dml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import com.ibm.bi.dml.runtime.instructions.spark.CheckpointSPInstruction;
import com.ibm.bi.dml.runtime.instructions.spark.SPInstruction;
import com.ibm.bi.dml.runtime.instructions.spark.data.BlockPartitioner;
import com.ibm.bi.dml.runtime.instructions.spark.data.BroadcastObject;
@@ -944,9 +946,22 @@ public class SparkExecutionContext extends ExecutionContext
{
//get input rdd and default storage level
MatrixObject mo = getMatrixObject(var);
+ MatrixCharacteristics mcIn = mo.getMatrixCharacteristics();
JavaPairRDD<MatrixIndexes,MatrixBlock> in = (JavaPairRDD<MatrixIndexes, MatrixBlock>)
getRDDHandleForMatrixObject(mo, InputInfo.BinaryBlockInputInfo);
+ //avoid unnecessary caching of input in order to reduce memory pressure
+ if( mo.getRDDHandle().allowsShortCircuitRead()
+ && isRDDMarkedForCaching(in.id()) && !isRDDCached(in.id()) ) {
+ in = (JavaPairRDD<MatrixIndexes,MatrixBlock>)
+ ((RDDObject)mo.getRDDHandle().getLineageChilds().get(0)).getRDD();
+
+ //investigate issue of unnecessarily large number of partitions
+ int numPartitions = CheckpointSPInstruction.getNumCoalescePartitions(mcIn, in);
+ if( numPartitions < in.partitions().size() )
+ in = in.coalesce( numPartitions );
+ }
+
//repartition and persist rdd (force creation of shuffled rdd via merge)
JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDAggregateUtils.mergeByKey(in);
out.persist( Checkpoint.DEFAULT_STORAGE_LEVEL )
@@ -1000,6 +1015,36 @@ public class SparkExecutionContext extends ExecutionContext
}
}
+ /**
+ *
+ * @param rddID
+ * @return
+ */
+ private boolean isRDDMarkedForCaching( int rddID ) {
+ JavaSparkContext jsc = getSparkContext();
+ return jsc.sc().getPersistentRDDs().contains(rddID);
+ }
+
+ /**
+ *
+ * @param rddID
+ * @return
+ */
+ private boolean isRDDCached( int rddID ) {
+ //check that rdd is marked for caching
+ JavaSparkContext jsc = getSparkContext();
+ if( !jsc.sc().getPersistentRDDs().contains(rddID) ) {
+ return false;
+ }
+
+ //check that rdd is actually already cached
+ for( RDDInfo info : jsc.sc().getRDDStorageInfo() ) {
+ if( info.id() == rddID )
+ return info.isCached();
+ }
+ return false;
+ }
+
///////////////////////////////////////////
// Debug String Handling (see explain); TODO to be removed
///////
@@ -1106,4 +1151,5 @@ public class SparkExecutionContext extends ExecutionContext
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/649dfbfd/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/CheckpointSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/CheckpointSPInstruction.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/CheckpointSPInstruction.java
index c02d441..affb0d5 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/CheckpointSPInstruction.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/CheckpointSPInstruction.java
@@ -96,14 +96,8 @@ public class CheckpointSPInstruction extends UnarySPInstruction
if( !in.getStorageLevel().equals( _level ) )
{
//investigate issue of unnecessarily large number of partitions
- boolean coalesce = false;
- int numPartitions = -1;
- if( mcIn.dimsKnown(true) ) {
- double hdfsBlockSize = InfrastructureAnalyzer.getHDFSBlockSize();
- double matrixPSize = OptimizerUtils.estimatePartitionedSizeExactSparsity(mcIn);
- numPartitions = (int) Math.max(Math.ceil(matrixPSize/hdfsBlockSize), 1);
- coalesce = ( numPartitions < in.partitions().size() );
- }
+ int numPartitions = getNumCoalescePartitions(mcIn, in);
+ boolean coalesce = ( numPartitions < in.partitions().size() );
//checkpoint pre-processing rdd operations
if( coalesce ) {
@@ -142,5 +136,23 @@ public class CheckpointSPInstruction extends UnarySPInstruction
}
sec.setVariable( output.getName(), mo);
}
+
+ /**
+ *
+ * @param mc
+ * @param in
+ * @return
+ */
+ public static int getNumCoalescePartitions(MatrixCharacteristics mc, JavaPairRDD<MatrixIndexes,MatrixBlock> in)
+ {
+ if( mc.dimsKnown(true) ) {
+ double hdfsBlockSize = InfrastructureAnalyzer.getHDFSBlockSize();
+ double matrixPSize = OptimizerUtils.estimatePartitionedSizeExactSparsity(mc);
+ return (int) Math.max(Math.ceil(matrixPSize/hdfsBlockSize), 1);
+ }
+ else {
+ return in.partitions().size();
+ }
+ }
}