You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/01/24 22:30:38 UTC
[2/2] incubator-systemml git commit: [SYSTEMML-382] MCSR-CSR sparse
block conversion on rdd repartition-cache
[SYSTEMML-382] MCSR-CSR sparse block conversion on rdd repartition-cache
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/2d32f6d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/2d32f6d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/2d32f6d5
Branch: refs/heads/master
Commit: 2d32f6d5812fba456bbae432c73c5da81a8c3e72
Parents: 1b29283
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sat Jan 23 19:50:13 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Jan 23 19:50:13 2016 -0800
----------------------------------------------------------------------
src/main/java/org/apache/sysml/hops/OptimizerUtils.java | 11 +++++++++++
.../controlprogram/context/SparkExecutionContext.java | 11 ++++++++++-
.../instructions/spark/CheckpointSPInstruction.java | 5 +----
3 files changed, 22 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2d32f6d5/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
index 57d38d7..80ef5f0 100644
--- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
@@ -31,6 +31,7 @@ import org.apache.sysml.hops.Hop.DataOpTypes;
import org.apache.sysml.hops.Hop.FileFormatTypes;
import org.apache.sysml.hops.Hop.OpOp2;
import org.apache.sysml.hops.rewrite.HopRewriteUtils;
+import org.apache.sysml.lops.Checkpoint;
import org.apache.sysml.lops.LopProperties.ExecType;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
@@ -508,6 +509,16 @@ public class OptimizerUtils
}
/**
+ *
+ * @param mcIn
+ * @return
+ */
+ public static boolean checkSparseBlockCSRConversion( MatrixCharacteristics mcIn ) {
+ return Checkpoint.CHECKPOINT_SPARSE_CSR
+ && OptimizerUtils.getSparsity(mcIn) < MatrixBlock.SPARSITY_TURN_POINT;
+ }
+
+ /**
* Returns the number of reducers that potentially run in parallel.
* This is either just the configured value (SystemML config) or
* the minimum of configured value and available reduce slots.
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2d32f6d5/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index b46a3af..45cb948 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -57,6 +57,7 @@ import org.apache.sysml.runtime.instructions.spark.data.RDDObject;
import org.apache.sysml.runtime.instructions.spark.functions.CopyBinaryCellFunction;
import org.apache.sysml.runtime.instructions.spark.functions.CopyBlockPairFunction;
import org.apache.sysml.runtime.instructions.spark.functions.CopyTextInputFunction;
+import org.apache.sysml.runtime.instructions.spark.functions.CreateSparseBlockFunction;
import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
@@ -65,6 +66,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.MatrixCell;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.matrix.data.SparseBlock;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.util.MapReduceTool;
import org.apache.sysml.utils.Statistics;
@@ -1037,8 +1039,15 @@ public class SparkExecutionContext extends ExecutionContext
in = in.coalesce( numPartitions );
}
- //repartition and persist rdd (force creation of shuffled rdd via merge)
+ //repartition rdd (force creation of shuffled rdd via merge)
JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDAggregateUtils.mergeByKey(in);
+
+ //convert mcsr into memory-efficient csr if potentially sparse
+ if( OptimizerUtils.checkSparseBlockCSRConversion(mcIn) ) {
+ out = out.mapValues(new CreateSparseBlockFunction(SparseBlock.Type.CSR));
+ }
+
+ //persist rdd in default storage level
out.persist( Checkpoint.DEFAULT_STORAGE_LEVEL )
.count(); //trigger caching to prevent contention
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2d32f6d5/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
index 6a0aa34..ee25052 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
@@ -22,7 +22,6 @@ package org.apache.sysml.runtime.instructions.spark;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.storage.StorageLevel;
import org.apache.sysml.hops.OptimizerUtils;
-import org.apache.sysml.lops.Checkpoint;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.DMLUnsupportedOperationException;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
@@ -114,9 +113,7 @@ public class CheckpointSPInstruction extends UnarySPInstruction
}
//convert mcsr into memory-efficient csr if potentially sparse
- if( OptimizerUtils.getSparsity(mcIn) < MatrixBlock.SPARSITY_TURN_POINT
- && Checkpoint.CHECKPOINT_SPARSE_CSR )
- {
+ if( OptimizerUtils.checkSparseBlockCSRConversion(mcIn) ) {
out = out.mapValues(new CreateSparseBlockFunction(SparseBlock.Type.CSR));
}