You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/02/20 04:06:48 UTC

[2/3] systemml git commit: [SYSTEMML-2155] Recompute nnz on spark checkpoints for large matrices

[SYSTEMML-2155] Recompute nnz on spark checkpoints for large matrices

This patch extends the existing spark checkpoint instruction (caching)
by the recomputation of nnz for matrices with dimensions larger than max
integer. Such matrices are always accessed by spark instructions and
hence never get their nnz computed due to lazy evaluation in spark.
Doing this nnz computation on checkpoints incurs almost no risk of
additional overhead because the intermediate will anyway be checkpointed
and is likely accessed many times.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/b0fff8c1
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/b0fff8c1
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/b0fff8c1

Branch: refs/heads/master
Commit: b0fff8c18e9aaa7be188f0defdefe251a3a4d46d
Parents: bd9d7eb
Author: Matthias Boehm <mb...@gmail.com>
Authored: Mon Feb 19 15:51:17 2018 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Mon Feb 19 20:06:43 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/sysml/hops/OptimizerUtils.java     |  5 +++++
 .../instructions/spark/CheckpointSPInstruction.java    | 13 ++++++++++---
 .../runtime/instructions/spark/utils/SparkUtils.java   |  4 ++++
 3 files changed, 19 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/b0fff8c1/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
index 224752d..3a406fc 100644
--- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
@@ -826,6 +826,11 @@ public class OptimizerUtils
 				|| (rl-1)/brlen == (ru-1)/brlen && (cl-1)%bclen == 0 
 				|| (rl-1)%brlen == 0 && (cl-1)/bclen == (cu-1)/bclen);
 	}
+	
+	public static boolean isValidCPDimensions( MatrixCharacteristics mc ) {
+		return isValidCPDimensions(mc.getRows(), mc.getCols());
+	}
+	
 	/**
 	 * Returns false if dimensions known to be invalid; other true
 	 * 

http://git-wip-us.apache.org/repos/asf/systemml/blob/b0fff8c1/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
index 3b71948..90f6c7b 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
@@ -130,19 +130,26 @@ public class CheckpointSPInstruction extends UnarySPInstruction {
 			//convert mcsr into memory-efficient csr if potentially sparse
 			if( input1.getDataType()==DataType.MATRIX 
 				&& OptimizerUtils.checkSparseBlockCSRConversion(mcIn)
-				&& !_level.equals(Checkpoint.SER_STORAGE_LEVEL) ) 
-			{				
+				&& !_level.equals(Checkpoint.SER_STORAGE_LEVEL) ) {
 				out = ((JavaPairRDD<MatrixIndexes,MatrixBlock>)out)
 					.mapValues(new CreateSparseBlockFunction(SparseBlock.Type.CSR));
 			}
 			
 			//actual checkpoint into given storage level
 			out = out.persist( _level );
+			
+			//trigger nnz computation for datasets that are forced to spark by their dimensions
+			//(larger than MAX_INT) to handle ultra-sparse data sets during recompilation because
+			//otherwise these their nnz would never be evaluated due to lazy evaluation in spark
+			if( input1.isMatrix() && mcIn.dimsKnown() 
+				&& !mcIn.dimsKnown(true) && !OptimizerUtils.isValidCPDimensions(mcIn) ) {
+				mcIn.setNonZeros(SparkUtils.getNonZeros((JavaPairRDD<MatrixIndexes,MatrixBlock>)out));
+			}
 		}
 		else {
 			out = in; //pass-through
 		}
-			
+		
 		// Step 3: In-place update of input matrix/frame rdd handle and set as output
 		// -------
 		// We use this in-place approach for two reasons. First, it is correct because our checkpoint 

http://git-wip-us.apache.org/repos/asf/systemml/blob/b0fff8c1/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
index 801fe5a..b24d56f 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
@@ -232,6 +232,10 @@ public class SparkUtils
 		
 		return ret;
 	}
+	
+	public static long getNonZeros(JavaPairRDD<MatrixIndexes, MatrixBlock> input) {
+		return input.values().map(b -> b.getNonZeros()).reduce((a,b)->a+b);
+	}
 
 	private static class AnalyzeCellMatrixCharacteristics implements Function<Tuple2<MatrixIndexes,MatrixCell>, MatrixCharacteristics> 
 	{