You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2015/11/28 22:23:44 UTC

incubator-systemml git commit: Improved rdd checkpoint injection (avoid unnecessary input caching)

Repository: incubator-systemml
Updated Branches:
  refs/heads/master f40974ee8 -> 649dfbfdf


Improved rdd checkpoint injection (avoid unnecessary input caching)

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/649dfbfd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/649dfbfd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/649dfbfd

Branch: refs/heads/master
Commit: 649dfbfdff2a58b6028360a9118ab19c50491e84
Parents: f40974e
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Fri Nov 27 20:57:12 2015 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Fri Nov 27 20:57:12 2015 -0800

----------------------------------------------------------------------
 .../context/SparkExecutionContext.java          | 46 ++++++++++++++++++++
 .../spark/CheckpointSPInstruction.java          | 28 ++++++++----
 2 files changed, 66 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/649dfbfd/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java
index 5d05950..48b4699 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -29,6 +29,7 @@ import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.storage.RDDInfo;
 import org.apache.spark.storage.StorageLevel;
 
 import scala.Tuple2;
@@ -43,6 +44,7 @@ import com.ibm.bi.dml.runtime.DMLUnsupportedOperationException;
 import com.ibm.bi.dml.runtime.controlprogram.Program;
 import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
 import com.ibm.bi.dml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import com.ibm.bi.dml.runtime.instructions.spark.CheckpointSPInstruction;
 import com.ibm.bi.dml.runtime.instructions.spark.SPInstruction;
 import com.ibm.bi.dml.runtime.instructions.spark.data.BlockPartitioner;
 import com.ibm.bi.dml.runtime.instructions.spark.data.BroadcastObject;
@@ -944,9 +946,22 @@ public class SparkExecutionContext extends ExecutionContext
 	{
 		//get input rdd and default storage level
 		MatrixObject mo = getMatrixObject(var);
+		MatrixCharacteristics mcIn = mo.getMatrixCharacteristics();
 		JavaPairRDD<MatrixIndexes,MatrixBlock> in = (JavaPairRDD<MatrixIndexes, MatrixBlock>) 
 				getRDDHandleForMatrixObject(mo, InputInfo.BinaryBlockInputInfo);
 		
+		//avoid unnecessary caching of input in order to reduce memory pressure
+		if( mo.getRDDHandle().allowsShortCircuitRead()
+			&& isRDDMarkedForCaching(in.id()) && !isRDDCached(in.id()) ) {
+			in = (JavaPairRDD<MatrixIndexes,MatrixBlock>)
+					((RDDObject)mo.getRDDHandle().getLineageChilds().get(0)).getRDD();
+			
+			//investigate issue of unnecessarily large number of partitions
+			int numPartitions = CheckpointSPInstruction.getNumCoalescePartitions(mcIn, in);
+			if( numPartitions < in.partitions().size() )
+				in = in.coalesce( numPartitions );
+		}
+		
 		//repartition and persist rdd (force creation of shuffled rdd via merge)
 		JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDAggregateUtils.mergeByKey(in);
 		out.persist( Checkpoint.DEFAULT_STORAGE_LEVEL )
@@ -1000,6 +1015,36 @@ public class SparkExecutionContext extends ExecutionContext
 		}
 	}
 	
+	/**
+	 * 
+	 * @param rddID
+	 * @return
+	 */
+	private boolean isRDDMarkedForCaching( int rddID ) {
+		JavaSparkContext jsc = getSparkContext();
+		return jsc.sc().getPersistentRDDs().contains(rddID);
+	}
+	
+	/**
+	 * 
+	 * @param rddID
+	 * @return
+	 */
+	private boolean isRDDCached( int rddID ) {
+		//check that rdd is marked for caching
+		JavaSparkContext jsc = getSparkContext();
+		if( !jsc.sc().getPersistentRDDs().contains(rddID) ) {
+			return false;
+		}
+		
+		//check that rdd is actually already cached
+		for( RDDInfo info : jsc.sc().getRDDStorageInfo() ) {
+			if( info.id() == rddID )
+				return info.isCached();
+		}
+		return false;
+	}
+	
 	///////////////////////////////////////////
 	// Debug String Handling (see explain); TODO to be removed
 	///////
@@ -1106,4 +1151,5 @@ public class SparkExecutionContext extends ExecutionContext
 		}
 		
 	}
+	
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/649dfbfd/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/CheckpointSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/CheckpointSPInstruction.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/CheckpointSPInstruction.java
index c02d441..affb0d5 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/CheckpointSPInstruction.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/CheckpointSPInstruction.java
@@ -96,14 +96,8 @@ public class CheckpointSPInstruction extends UnarySPInstruction
 		if( !in.getStorageLevel().equals( _level ) ) 
 		{
 			//investigate issue of unnecessarily large number of partitions
-			boolean coalesce = false;
-			int numPartitions = -1;
-			if( mcIn.dimsKnown(true) ) {
-				double hdfsBlockSize = InfrastructureAnalyzer.getHDFSBlockSize();
-				double matrixPSize = OptimizerUtils.estimatePartitionedSizeExactSparsity(mcIn);
-				numPartitions = (int) Math.max(Math.ceil(matrixPSize/hdfsBlockSize), 1);
-				coalesce = ( numPartitions < in.partitions().size() );
-			}
+			int numPartitions = getNumCoalescePartitions(mcIn, in);
+			boolean coalesce = ( numPartitions < in.partitions().size() );
 			
 			//checkpoint pre-processing rdd operations
 			if( coalesce ) {
@@ -142,5 +136,23 @@ public class CheckpointSPInstruction extends UnarySPInstruction
 		}
 		sec.setVariable( output.getName(), mo);
 	}
+	
+	/**
+	 * 
+	 * @param mc
+	 * @param in
+	 * @return
+	 */
+	public static int getNumCoalescePartitions(MatrixCharacteristics mc, JavaPairRDD<MatrixIndexes,MatrixBlock> in)
+	{
+		if( mc.dimsKnown(true) ) {
+			double hdfsBlockSize = InfrastructureAnalyzer.getHDFSBlockSize();
+			double matrixPSize = OptimizerUtils.estimatePartitionedSizeExactSparsity(mc);
+			return (int) Math.max(Math.ceil(matrixPSize/hdfsBlockSize), 1);
+		}
+		else {
+			return in.partitions().size();
+		}
+	}
 }