You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/11 16:36:47 UTC

[2/2] incubator-systemml git commit: [SYSTEMML-905] Fix unary aggregates over compressed matrices w/ UC group

[SYSTEMML-905] Fix unary aggregates over compressed matrices w/ UC group

This patch fixes result correctness issues of unary aggregates over
compressed matrices w/ UC group, where the UC group resets partial
results from preceding groups. Furthermore, this also includes extended
tests to always include a UC group.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/f0c91ed0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/f0c91ed0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/f0c91ed0

Branch: refs/heads/master
Commit: f0c91ed0d115dc727a79ec2c9e0c4a8aa420c727
Parents: 245488c
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sun Sep 11 02:03:58 2016 +0200
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sun Sep 11 02:03:58 2016 +0200

----------------------------------------------------------------------
 .../runtime/compress/ColGroupUncompressed.java   | 15 +++++++++++++++
 .../runtime/compress/CompressedMatrixBlock.java  | 19 +++++++++++++------
 .../compress/BasicUnaryAggregateTest.java        |  1 +
 .../compress/ParUnaryAggregateTest.java          |  1 +
 4 files changed, 30 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f0c91ed0/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
index 0ac82aa..085244c 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.functionobjects.ReduceRow;
 import org.apache.sysml.runtime.matrix.data.LibMatrixAgg;
 import org.apache.sysml.runtime.matrix.data.LibMatrixMult;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -325,6 +326,20 @@ public class ColGroupUncompressed extends ColGroup
 	{
 		//execute unary aggregate operations
 		LibMatrixAgg.aggregateUnaryMatrix(_data, ret, op);
+		
+		//shift result into correct column indexes
+		if( op.indexFn instanceof ReduceRow ) {
+			//clear corrections
+			for( int i=0; i<_colIndexes.length; i++ )
+				if( op.aggOp.correctionExists )
+					ret.quickSetValue(0, i+_colIndexes.length, 0);
+			//shift partial results
+			for( int i=_colIndexes.length-1; i>=0; i-- ) {
+				double val = ret.quickGetValue(0, i);
+				ret.quickSetValue(0, i, 0);
+				ret.quickSetValue(0, _colIndexes[i], val);
+			}
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f0c91ed0/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index fac2461..7ff3a8a 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -1049,6 +1049,9 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 			ArrayList<ColGroup>[] grpParts = createStaticTaskPartitioning(op.getNumThreads(), false);
 			ColGroupUncompressed uc = getUncompressedColGroup();
 			try {
+				//compute uncompressed column group in parallel (otherwise bottleneck)
+				if( uc != null )
+					 ret = (MatrixBlock)uc.getData().aggregateUnaryOperations(op, ret, blockingFactorRow, blockingFactorCol, indexesIn, false);					
 				//compute all compressed column groups
 				ExecutorService pool = Executors.newFixedThreadPool( op.getNumThreads() );
 				ArrayList<UnaryAggregateTask> tasks = new ArrayList<UnaryAggregateTask>();
@@ -1056,9 +1059,6 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 					tasks.add(new UnaryAggregateTask(grp, ret, op));
 				pool.invokeAll(tasks);	
 				pool.shutdown();
-				//compute uncompressed column group in parallel (otherwise bottleneck)
-				if( uc != null )
-					 ret = (MatrixBlock)uc.getData().aggregateUnaryOperations(op, ret, blockingFactorRow, blockingFactorCol, indexesIn, false);					
 				//aggregate partial results
 				if( !(op.indexFn instanceof ReduceRow) ){
 					KahanObject kbuff = new KahanObject(0,0);
@@ -1077,9 +1077,16 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 			}
 		}
 		else {
-			for (ColGroup grp : _colGroups) {
-				grp.unaryAggregateOperations(op, ret);
-			}
+			//process UC column group
+			for (ColGroup grp : _colGroups)
+				if( grp instanceof ColGroupUncompressed )
+					grp.unaryAggregateOperations(op, ret);
+			
+			//process OLE/RLE column groups
+			for (ColGroup grp : _colGroups)
+				if( !(grp instanceof ColGroupUncompressed) )
+					grp.unaryAggregateOperations(op, ret);
+			
 		}
 		
 		//drop correction if necessary

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f0c91ed0/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicUnaryAggregateTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicUnaryAggregateTest.java b/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicUnaryAggregateTest.java
index 960008d..767ca03 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicUnaryAggregateTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicUnaryAggregateTest.java
@@ -507,6 +507,7 @@ public class BasicUnaryAggregateTest extends AutomatedTestBase
 			if( vtype==ValueType.RAND_ROUND )
 				input = TestUtils.round(input);
 			MatrixBlock mb = DataConverter.convertToMatrixBlock(input);
+			mb = mb.appendOperations(MatrixBlock.seqOperations(0.1, rows-0.1, 1), new MatrixBlock()); //uc group
 			
 			//prepare unary aggregate operator
 			AggregateUnaryOperator auop = null;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f0c91ed0/src/test/java/org/apache/sysml/test/integration/functions/compress/ParUnaryAggregateTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/compress/ParUnaryAggregateTest.java b/src/test/java/org/apache/sysml/test/integration/functions/compress/ParUnaryAggregateTest.java
index 7d65418..5224298 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/compress/ParUnaryAggregateTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/compress/ParUnaryAggregateTest.java
@@ -509,6 +509,7 @@ public class ParUnaryAggregateTest extends AutomatedTestBase
 			if( vtype==ValueType.RAND_ROUND )
 				input = TestUtils.round(input);
 			MatrixBlock mb = DataConverter.convertToMatrixBlock(input);
+			mb = mb.appendOperations(MatrixBlock.seqOperations(0.1, rows-0.1, 1), new MatrixBlock()); //uc group
 			
 			//prepare unary aggregate operator
 			AggregateUnaryOperator auop = null;