You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/12/08 12:40:45 UTC

systemml git commit: [SYSTEMML-2503/04] Fix correctness in-place and broadcast cumagg ops

Repository: systemml
Updated Branches:
  refs/heads/master bda61b600 -> 1a58946a0


[SYSTEMML-2503/04] Fix correctness in-place and broadcast cumagg ops

This patch fixes correctness issues of in-place cumulative aggregate
operations and well as the handling of lineage tracing on spark cumagg
offset. In addition, the patch also includes a minor performance
improvement that avoids unnecessary copying of offset vectors on cumagg
offset operations.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/1a58946a
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/1a58946a
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/1a58946a

Branch: refs/heads/master
Commit: 1a58946a0a335ccae61d0cf3873a937467ae5544
Parents: bda61b6
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sat Dec 8 13:40:33 2018 +0100
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Dec 8 13:40:33 2018 +0100

----------------------------------------------------------------------
 .../instructions/spark/CumulativeOffsetSPInstruction.java |  9 ++++++---
 .../apache/sysml/runtime/matrix/data/LibMatrixAgg.java    | 10 ++++++----
 .../org/apache/sysml/runtime/matrix/data/MatrixBlock.java |  4 ++--
 .../java/org/apache/sysml/runtime/util/DataConverter.java |  9 ++++++++-
 4 files changed, 22 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/1a58946a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
index 1b26060..3dba53e 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
@@ -32,6 +32,7 @@ import scala.Tuple2;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.functionobjects.Builtin;
+import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
@@ -94,8 +95,9 @@ public class CumulativeOffsetSPInstruction extends BinarySPInstruction {
 		//get and join inputs
 		JavaPairRDD<MatrixIndexes,MatrixBlock> inData = sec.getBinaryBlockRDDHandleForVariable(input1.getName());
 		JavaPairRDD<MatrixIndexes,Tuple2<MatrixBlock,MatrixBlock>> joined = null;
+		boolean broadcast = _broadcast && !SparkUtils.isHashPartitioned(inData);
 		
-		if( _broadcast && !SparkUtils.isHashPartitioned(inData) ) {
+		if( broadcast ) {
 			//broadcast offsets and broadcast join with data
 			PartitionedBroadcast<MatrixBlock> inAgg = sec.getBroadcastForVariable(input2.getName());
 			joined = inData.mapToPair(new RDDCumSplitLookupFunction(inAgg,_initValue, rlen, brlen));
@@ -119,7 +121,7 @@ public class CumulativeOffsetSPInstruction extends BinarySPInstruction {
 			updateUnaryOutputMatrixCharacteristics(sec);
 		sec.setRDDHandleForVariable(output.getName(), out);
 		sec.addLineageRDD(output.getName(), input1.getName());
-		sec.addLineage(output.getName(), input2.getName(), _broadcast);
+		sec.addLineage(output.getName(), input2.getName(), broadcast);
 	}
 
 	private static class RDDCumSplitFunction implements PairFlatMapFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock> 
@@ -229,7 +231,8 @@ public class CumulativeOffsetSPInstruction extends BinarySPInstruction {
 			
 			//blockwise cumagg computation, incl offset aggregation
 			return LibMatrixAgg.cumaggregateUnaryMatrix(dblkIn, blkOut, _uop,
-				DataConverter.convertToDoubleVector(oblkIn));
+				DataConverter.convertToDoubleVector(oblkIn, false,
+				((Builtin)_uop.fn).bFunc == BuiltinCode.CUMSUM));
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/1a58946a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
index 5e785d9..ed7d8f1 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
@@ -294,14 +294,16 @@ public class LibMatrixAgg
 		final int n2 = out.clen;
 		
 		//filter empty input blocks (incl special handling for sparse-unsafe operations)
-		if( in.isEmptyBlock(false) && (agg == null || aggtype == AggType.CUM_SUM_PROD ) ) {
+		if( in.isEmpty() && (agg == null || aggtype == AggType.CUM_SUM_PROD) ) {
 			return aggregateUnaryMatrixEmpty(in, out, aggtype, null);
 		}
 		
 		//allocate output arrays (if required)
-		if( !uop.isInplace() || in.isInSparseFormat() ) {
+		if( !uop.isInplace() || in.isInSparseFormat() || in.isEmpty() ) {
 			out.reset(m2, n2, false); //always dense
 			out.allocateDenseBlock();
+			if( in.isEmpty() )
+				in.allocateBlock();
 		}
 		else {
 			out = in;
@@ -340,14 +342,14 @@ public class LibMatrixAgg
 		final int mk = aggtype==AggType.CUM_KAHAN_SUM?2:1;
 		
 		//filter empty input blocks (incl special handling for sparse-unsafe operations)
-		if( in.isEmptyBlock(false) ){
+		if( in.isEmpty() ){
 			return aggregateUnaryMatrixEmpty(in, out, aggtype, null);
 		}
 
 		//Timing time = new Timing(true);
 		
 		//allocate output arrays (if required)
-		if( !uop.isInplace() || in.isInSparseFormat() ) {
+		if( !uop.isInplace() || in.isInSparseFormat() || in.isEmpty() ) {
 			out.reset(m2, n2, false); //always dense
 			out.allocateDenseBlock();
 		}

http://git-wip-us.apache.org/repos/asf/systemml/blob/1a58946a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 7e93598..d1f9ac9 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -2656,9 +2656,9 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 		if( LibMatrixAgg.isSupportedUnaryOperator(op) ) {
 			//e.g., cumsum/cumprod/cummin/cumax/cumsumprod
 			if( op.getNumThreads() > 1 )
-				LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, op, op.getNumThreads());
+				ret = LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, op, op.getNumThreads());
 			else
-				LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, op);
+				ret = LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, op);
 		}
 		else if(!sparse && !isEmptyBlock(false)
 			&& OptimizerUtils.isMaxLocalParallelism(op.getNumThreads())) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/1a58946a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
index f4d3fb5..0c99959 100644
--- a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
+++ b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
@@ -344,8 +344,15 @@ public class DataConverter
 		return convertToDoubleVector(mb, true);
 	}
 	
-	public static double[] convertToDoubleVector( MatrixBlock mb, boolean deep )
+	public static double[] convertToDoubleVector( MatrixBlock mb, boolean deep ) {
+		return convertToDoubleVector(mb, deep, false);
+	}
+	
+	public static double[] convertToDoubleVector( MatrixBlock mb, boolean deep, boolean allowNull )
 	{
+		if( mb.isEmpty() && allowNull )
+			return null;
+		
 		int rows = mb.getNumRows();
 		int cols = mb.getNumColumns();
 		double[] ret = (!mb.isInSparseFormat() && mb.isAllocated() && !deep) ?