You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/11 16:36:47 UTC
[2/2] incubator-systemml git commit: [SYSTEMML-905] Fix unary
aggregates over compressed matrices w/ UC group
[SYSTEMML-905] Fix unary aggregates over compressed matrices w/ UC group
This patch fixes result correctness issues of unary aggregates over
compressed matrices w/ UC group, where the UC group resets partial
results from preceding groups. Furthermore, this also includes extended
tests to always include a UC group.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/f0c91ed0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/f0c91ed0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/f0c91ed0
Branch: refs/heads/master
Commit: f0c91ed0d115dc727a79ec2c9e0c4a8aa420c727
Parents: 245488c
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sun Sep 11 02:03:58 2016 +0200
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sun Sep 11 02:03:58 2016 +0200
----------------------------------------------------------------------
.../runtime/compress/ColGroupUncompressed.java | 15 +++++++++++++++
.../runtime/compress/CompressedMatrixBlock.java | 19 +++++++++++++------
.../compress/BasicUnaryAggregateTest.java | 1 +
.../compress/ParUnaryAggregateTest.java | 1 +
4 files changed, 30 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f0c91ed0/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
index 0ac82aa..085244c 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.functionobjects.ReduceRow;
import org.apache.sysml.runtime.matrix.data.LibMatrixAgg;
import org.apache.sysml.runtime.matrix.data.LibMatrixMult;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -325,6 +326,20 @@ public class ColGroupUncompressed extends ColGroup
{
//execute unary aggregate operations
LibMatrixAgg.aggregateUnaryMatrix(_data, ret, op);
+
+ //shift result into correct column indexes
+ if( op.indexFn instanceof ReduceRow ) {
+ //clear corrections
+ for( int i=0; i<_colIndexes.length; i++ )
+ if( op.aggOp.correctionExists )
+ ret.quickSetValue(0, i+_colIndexes.length, 0);
+ //shift partial results
+ for( int i=_colIndexes.length-1; i>=0; i-- ) {
+ double val = ret.quickGetValue(0, i);
+ ret.quickSetValue(0, i, 0);
+ ret.quickSetValue(0, _colIndexes[i], val);
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f0c91ed0/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index fac2461..7ff3a8a 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -1049,6 +1049,9 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
ArrayList<ColGroup>[] grpParts = createStaticTaskPartitioning(op.getNumThreads(), false);
ColGroupUncompressed uc = getUncompressedColGroup();
try {
+ //compute uncompressed column group in parallel (otherwise bottleneck)
+ if( uc != null )
+ ret = (MatrixBlock)uc.getData().aggregateUnaryOperations(op, ret, blockingFactorRow, blockingFactorCol, indexesIn, false);
//compute all compressed column groups
ExecutorService pool = Executors.newFixedThreadPool( op.getNumThreads() );
ArrayList<UnaryAggregateTask> tasks = new ArrayList<UnaryAggregateTask>();
@@ -1056,9 +1059,6 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
tasks.add(new UnaryAggregateTask(grp, ret, op));
pool.invokeAll(tasks);
pool.shutdown();
- //compute uncompressed column group in parallel (otherwise bottleneck)
- if( uc != null )
- ret = (MatrixBlock)uc.getData().aggregateUnaryOperations(op, ret, blockingFactorRow, blockingFactorCol, indexesIn, false);
//aggregate partial results
if( !(op.indexFn instanceof ReduceRow) ){
KahanObject kbuff = new KahanObject(0,0);
@@ -1077,9 +1077,16 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
}
}
else {
- for (ColGroup grp : _colGroups) {
- grp.unaryAggregateOperations(op, ret);
- }
+ //process UC column group
+ for (ColGroup grp : _colGroups)
+ if( grp instanceof ColGroupUncompressed )
+ grp.unaryAggregateOperations(op, ret);
+
+ //process OLE/RLE column groups
+ for (ColGroup grp : _colGroups)
+ if( !(grp instanceof ColGroupUncompressed) )
+ grp.unaryAggregateOperations(op, ret);
+
}
//drop correction if necessary
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f0c91ed0/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicUnaryAggregateTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicUnaryAggregateTest.java b/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicUnaryAggregateTest.java
index 960008d..767ca03 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicUnaryAggregateTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicUnaryAggregateTest.java
@@ -507,6 +507,7 @@ public class BasicUnaryAggregateTest extends AutomatedTestBase
if( vtype==ValueType.RAND_ROUND )
input = TestUtils.round(input);
MatrixBlock mb = DataConverter.convertToMatrixBlock(input);
+ mb = mb.appendOperations(MatrixBlock.seqOperations(0.1, rows-0.1, 1), new MatrixBlock()); //uc group
//prepare unary aggregate operator
AggregateUnaryOperator auop = null;
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f0c91ed0/src/test/java/org/apache/sysml/test/integration/functions/compress/ParUnaryAggregateTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/compress/ParUnaryAggregateTest.java b/src/test/java/org/apache/sysml/test/integration/functions/compress/ParUnaryAggregateTest.java
index 7d65418..5224298 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/compress/ParUnaryAggregateTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/compress/ParUnaryAggregateTest.java
@@ -509,6 +509,7 @@ public class ParUnaryAggregateTest extends AutomatedTestBase
if( vtype==ValueType.RAND_ROUND )
input = TestUtils.round(input);
MatrixBlock mb = DataConverter.convertToMatrixBlock(input);
+ mb = mb.appendOperations(MatrixBlock.seqOperations(0.1, rows-0.1, 1), new MatrixBlock()); //uc group
//prepare unary aggregate operator
AggregateUnaryOperator auop = null;