You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/01/24 22:30:38 UTC

[2/2] incubator-systemml git commit: [SYSTEMML-382] MCSR-CSR sparse block conversion on rdd repartition-cache

[SYSTEMML-382] MCSR-CSR sparse block conversion on rdd repartition-cache

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/2d32f6d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/2d32f6d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/2d32f6d5

Branch: refs/heads/master
Commit: 2d32f6d5812fba456bbae432c73c5da81a8c3e72
Parents: 1b29283
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sat Jan 23 19:50:13 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Jan 23 19:50:13 2016 -0800

----------------------------------------------------------------------
 src/main/java/org/apache/sysml/hops/OptimizerUtils.java  | 11 +++++++++++
 .../controlprogram/context/SparkExecutionContext.java    | 11 ++++++++++-
 .../instructions/spark/CheckpointSPInstruction.java      |  5 +----
 3 files changed, 22 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2d32f6d5/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
index 57d38d7..80ef5f0 100644
--- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
@@ -31,6 +31,7 @@ import org.apache.sysml.hops.Hop.DataOpTypes;
 import org.apache.sysml.hops.Hop.FileFormatTypes;
 import org.apache.sysml.hops.Hop.OpOp2;
 import org.apache.sysml.hops.rewrite.HopRewriteUtils;
+import org.apache.sysml.lops.Checkpoint;
 import org.apache.sysml.lops.LopProperties.ExecType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
@@ -508,6 +509,16 @@ public class OptimizerUtils
 	}
 	
 	/**
+	 * 
+	 * @param mcIn
+	 * @return
+	 */
+	public static boolean checkSparseBlockCSRConversion( MatrixCharacteristics mcIn ) {
+		return Checkpoint.CHECKPOINT_SPARSE_CSR
+			&& OptimizerUtils.getSparsity(mcIn) < MatrixBlock.SPARSITY_TURN_POINT;
+	}
+	
+	/**
 	 * Returns the number of reducers that potentially run in parallel.
 	 * This is either just the configured value (SystemML config) or
 	 * the minimum of configured value and available reduce slots. 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2d32f6d5/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index b46a3af..45cb948 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -57,6 +57,7 @@ import org.apache.sysml.runtime.instructions.spark.data.RDDObject;
 import org.apache.sysml.runtime.instructions.spark.functions.CopyBinaryCellFunction;
 import org.apache.sysml.runtime.instructions.spark.functions.CopyBlockPairFunction;
 import org.apache.sysml.runtime.instructions.spark.functions.CopyTextInputFunction;
+import org.apache.sysml.runtime.instructions.spark.functions.CreateSparseBlockFunction;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
@@ -65,6 +66,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixCell;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.matrix.data.SparseBlock;
 import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
 import org.apache.sysml.runtime.util.MapReduceTool;
 import org.apache.sysml.utils.Statistics;
@@ -1037,8 +1039,15 @@ public class SparkExecutionContext extends ExecutionContext
 				in = in.coalesce( numPartitions );
 		}
 		
-		//repartition and persist rdd (force creation of shuffled rdd via merge)
+		//repartition rdd (force creation of shuffled rdd via merge)
 		JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDAggregateUtils.mergeByKey(in);
+		
+		//convert mcsr into memory-efficient csr if potentially sparse
+		if( OptimizerUtils.checkSparseBlockCSRConversion(mcIn) ) {				
+			out = out.mapValues(new CreateSparseBlockFunction(SparseBlock.Type.CSR));
+		}
+		
+		//persist rdd in default storage level 
 		out.persist( Checkpoint.DEFAULT_STORAGE_LEVEL )
 		   .count(); //trigger caching to prevent contention
 		

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2d32f6d5/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
index 6a0aa34..ee25052 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
@@ -22,7 +22,6 @@ package org.apache.sysml.runtime.instructions.spark;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.sysml.hops.OptimizerUtils;
-import org.apache.sysml.lops.Checkpoint;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.DMLUnsupportedOperationException;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
@@ -114,9 +113,7 @@ public class CheckpointSPInstruction extends UnarySPInstruction
 			}
 		
 			//convert mcsr into memory-efficient csr if potentially sparse
-			if( OptimizerUtils.getSparsity(mcIn) < MatrixBlock.SPARSITY_TURN_POINT
-				&& Checkpoint.CHECKPOINT_SPARSE_CSR )
-			{				
+			if( OptimizerUtils.checkSparseBlockCSRConversion(mcIn) ) {				
 				out = out.mapValues(new CreateSparseBlockFunction(SparseBlock.Type.CSR));
 			}