You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/12/01 20:06:44 UTC

[2/3] systemml git commit: [SYSTEMML-2501] Sparse aggregate communication spark cumagg ops

[SYSTEMML-2501] Sparse aggregate communication spark cumagg ops

This patch improves GC overhead of Spark cumulative aggregates (forward
cascade) by communicating sparse aggregate blocks in target block sizes.
For example, for 100 distributed sum(cumsum(X)) operations, it reduced
the total runtime from 1,006s to 887s.

Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/fee20fb9
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/fee20fb9
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/fee20fb9

Branch: refs/heads/master
Commit: fee20fb9b8c975bd3c3250ae7fe35c4904c0dc09
Parents: 77a7ef1
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sat Dec 1 19:22:19 2018 +0100
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Dec 1 19:22:19 2018 +0100

----------------------------------------------------------------------
 .../spark/CumulativeAggregateSPInstruction.java         |  4 +++-
 .../instructions/spark/utils/RDDAggregateUtils.java     |  2 +-
 .../apache/sysml/runtime/matrix/data/MatrixBlock.java   | 12 ++++++++----
 3 files changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/fee20fb9/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
index a0dfb85..e8696da 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
@@ -137,7 +137,9 @@ public class CumulativeAggregateSPInstruction extends AggregateUnarySPInstructio
 			int rlenBlk = IntUtils.toInt( Math.min(rlenOut-(rixOut-1)*_brlen, _brlen));
 			int clenBlk = blkOut.getNumColumns();
 			int posBlk = IntUtils.toInt((ixIn.getRowIndex()-1) % _brlen);
-			MatrixBlock blkOut2 = new MatrixBlock(rlenBlk, clenBlk, false);
+			
+			//construct sparse output blocks (single row in target block size)
+			MatrixBlock blkOut2 = new MatrixBlock(rlenBlk, clenBlk, true);
 			blkOut2.copy(posBlk, posBlk, 0, clenBlk-1, blkOut, true);
 			ixOut.setIndexes(rixOut, ixOut.getColumnIndex());
 			

http://git-wip-us.apache.org/repos/asf/systemml/blob/fee20fb9/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
index 23b6ad9..0b01099 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
@@ -644,7 +644,7 @@ public class RDDAggregateUtils
 
 			// execute merge (never pass by reference)
 			MatrixBlock ret = _deep ? new MatrixBlock(b1) : b1;
-			ret.merge(b2, false);
+			ret.merge(b2, false, false, _deep);
 			ret.examSparsity();
 			
 			// sanity check output number of non-zeros

http://git-wip-us.apache.org/repos/asf/systemml/blob/fee20fb9/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index ae5ab84..1ff2f43 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -1623,10 +1623,14 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 	 * @param appendOnly ?
 	 */
 	public void merge(MatrixBlock that, boolean appendOnly) {
-		merge(that, appendOnly, false);
+		merge(that, appendOnly, false, true);
 	}
 	
 	public void merge(MatrixBlock that, boolean appendOnly, boolean par) {
+		merge(that, appendOnly, par, true);
+	}
+	
+	public void merge(MatrixBlock that, boolean appendOnly, boolean par, boolean deep) {
 		//check for empty input source (nothing to merge)
 		if( that == null || that.isEmptyBlock(false) )
 			return;
@@ -1647,7 +1651,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 		//core matrix block merge (guaranteed non-empty source/target, nnz maintenance not required)
 		long nnz = nonZeros + that.nonZeros;
 		if( sparse )
-			mergeIntoSparse(that, appendOnly);
+			mergeIntoSparse(that, appendOnly, deep);
 		else if( par )
 			mergeIntoDensePar(that);
 		else
@@ -1723,7 +1727,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 		}
 	}
 
-	private void mergeIntoSparse(MatrixBlock that, boolean appendOnly) {
+	private void mergeIntoSparse(MatrixBlock that, boolean appendOnly, boolean deep) {
 		SparseBlock a = sparseBlock;
 		final boolean COO = (a instanceof SparseBlockCOO);
 		final int m = rlen;
@@ -1734,7 +1738,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 				if( b.isEmpty(i) ) continue;
 				if( !COO && a.isEmpty(i) ) {
 					//copy entire sparse row (no sort required)
-					a.set(i, b.get(i), true);
+					a.set(i, b.get(i), deep);
 				}
 				else {
 					boolean appended = false;