You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/12/08 12:40:45 UTC
systemml git commit: [SYSTEMML-2503/04] Fix correctness in-place and
broadcast cumagg ops
Repository: systemml
Updated Branches:
refs/heads/master bda61b600 -> 1a58946a0
[SYSTEMML-2503/04] Fix correctness in-place and broadcast cumagg ops
This patch fixes correctness issues of in-place cumulative aggregate
operations and well as the handling of lineage tracing on spark cumagg
offset. In addition, the patch also includes a minor performance
improvement that avoids unnecessary copying of offset vectors on cumagg
offset operations.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/1a58946a
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/1a58946a
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/1a58946a
Branch: refs/heads/master
Commit: 1a58946a0a335ccae61d0cf3873a937467ae5544
Parents: bda61b6
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sat Dec 8 13:40:33 2018 +0100
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Dec 8 13:40:33 2018 +0100
----------------------------------------------------------------------
.../instructions/spark/CumulativeOffsetSPInstruction.java | 9 ++++++---
.../apache/sysml/runtime/matrix/data/LibMatrixAgg.java | 10 ++++++----
.../org/apache/sysml/runtime/matrix/data/MatrixBlock.java | 4 ++--
.../java/org/apache/sysml/runtime/util/DataConverter.java | 9 ++++++++-
4 files changed, 22 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/1a58946a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
index 1b26060..3dba53e 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
@@ -32,6 +32,7 @@ import scala.Tuple2;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysml.runtime.functionobjects.Builtin;
+import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
import org.apache.sysml.runtime.instructions.InstructionUtils;
import org.apache.sysml.runtime.instructions.cp.CPOperand;
import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
@@ -94,8 +95,9 @@ public class CumulativeOffsetSPInstruction extends BinarySPInstruction {
//get and join inputs
JavaPairRDD<MatrixIndexes,MatrixBlock> inData = sec.getBinaryBlockRDDHandleForVariable(input1.getName());
JavaPairRDD<MatrixIndexes,Tuple2<MatrixBlock,MatrixBlock>> joined = null;
+ boolean broadcast = _broadcast && !SparkUtils.isHashPartitioned(inData);
- if( _broadcast && !SparkUtils.isHashPartitioned(inData) ) {
+ if( broadcast ) {
//broadcast offsets and broadcast join with data
PartitionedBroadcast<MatrixBlock> inAgg = sec.getBroadcastForVariable(input2.getName());
joined = inData.mapToPair(new RDDCumSplitLookupFunction(inAgg,_initValue, rlen, brlen));
@@ -119,7 +121,7 @@ public class CumulativeOffsetSPInstruction extends BinarySPInstruction {
updateUnaryOutputMatrixCharacteristics(sec);
sec.setRDDHandleForVariable(output.getName(), out);
sec.addLineageRDD(output.getName(), input1.getName());
- sec.addLineage(output.getName(), input2.getName(), _broadcast);
+ sec.addLineage(output.getName(), input2.getName(), broadcast);
}
private static class RDDCumSplitFunction implements PairFlatMapFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock>
@@ -229,7 +231,8 @@ public class CumulativeOffsetSPInstruction extends BinarySPInstruction {
//blockwise cumagg computation, incl offset aggregation
return LibMatrixAgg.cumaggregateUnaryMatrix(dblkIn, blkOut, _uop,
- DataConverter.convertToDoubleVector(oblkIn));
+ DataConverter.convertToDoubleVector(oblkIn, false,
+ ((Builtin)_uop.fn).bFunc == BuiltinCode.CUMSUM));
}
}
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/1a58946a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
index 5e785d9..ed7d8f1 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
@@ -294,14 +294,16 @@ public class LibMatrixAgg
final int n2 = out.clen;
//filter empty input blocks (incl special handling for sparse-unsafe operations)
- if( in.isEmptyBlock(false) && (agg == null || aggtype == AggType.CUM_SUM_PROD ) ) {
+ if( in.isEmpty() && (agg == null || aggtype == AggType.CUM_SUM_PROD) ) {
return aggregateUnaryMatrixEmpty(in, out, aggtype, null);
}
//allocate output arrays (if required)
- if( !uop.isInplace() || in.isInSparseFormat() ) {
+ if( !uop.isInplace() || in.isInSparseFormat() || in.isEmpty() ) {
out.reset(m2, n2, false); //always dense
out.allocateDenseBlock();
+ if( in.isEmpty() )
+ in.allocateBlock();
}
else {
out = in;
@@ -340,14 +342,14 @@ public class LibMatrixAgg
final int mk = aggtype==AggType.CUM_KAHAN_SUM?2:1;
//filter empty input blocks (incl special handling for sparse-unsafe operations)
- if( in.isEmptyBlock(false) ){
+ if( in.isEmpty() ){
return aggregateUnaryMatrixEmpty(in, out, aggtype, null);
}
//Timing time = new Timing(true);
//allocate output arrays (if required)
- if( !uop.isInplace() || in.isInSparseFormat() ) {
+ if( !uop.isInplace() || in.isInSparseFormat() || in.isEmpty() ) {
out.reset(m2, n2, false); //always dense
out.allocateDenseBlock();
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/1a58946a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 7e93598..d1f9ac9 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -2656,9 +2656,9 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
if( LibMatrixAgg.isSupportedUnaryOperator(op) ) {
//e.g., cumsum/cumprod/cummin/cumax/cumsumprod
if( op.getNumThreads() > 1 )
- LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, op, op.getNumThreads());
+ ret = LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, op, op.getNumThreads());
else
- LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, op);
+ ret = LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, op);
}
else if(!sparse && !isEmptyBlock(false)
&& OptimizerUtils.isMaxLocalParallelism(op.getNumThreads())) {
http://git-wip-us.apache.org/repos/asf/systemml/blob/1a58946a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
index f4d3fb5..0c99959 100644
--- a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
+++ b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
@@ -344,8 +344,15 @@ public class DataConverter
return convertToDoubleVector(mb, true);
}
- public static double[] convertToDoubleVector( MatrixBlock mb, boolean deep )
+ public static double[] convertToDoubleVector( MatrixBlock mb, boolean deep ) {
+ return convertToDoubleVector(mb, deep, false);
+ }
+
+ public static double[] convertToDoubleVector( MatrixBlock mb, boolean deep, boolean allowNull )
{
+ if( mb.isEmpty() && allowNull )
+ return null;
+
int rows = mb.getNumRows();
int cols = mb.getNumColumns();
double[] ret = (!mb.isInSparseFormat() && mb.isAllocated() && !deep) ?