You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/04/27 07:02:35 UTC

[1/6] systemml git commit: [SYSTEMML-2279] Performance spark unary aggregates (empty block filter)

Repository: systemml
Updated Branches:
  refs/heads/master 1cc72d797 -> 686e3831d


[SYSTEMML-2279] Performance spark unary aggregates (empty block filter)

This patch improves the performance of sparse-safe spark unary aggregate
operations such as sum(X) by filtering empty blocks before the actual
unary aggregate operations.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/18cc576d
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/18cc576d
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/18cc576d

Branch: refs/heads/master
Commit: 18cc576dcad813a059322d9f0bb83208ed0bb646
Parents: 1cc72d7
Author: Matthias Boehm <mb...@gmail.com>
Authored: Thu Apr 26 20:39:24 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Apr 27 00:03:12 2018 -0700

----------------------------------------------------------------------
 .../instructions/spark/AggregateUnarySPInstruction.java   | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/18cc576d/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java
index 860384f..266db7b 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java
@@ -34,6 +34,7 @@ import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.functions.AggregateDropCorrectionFunction;
 import org.apache.sysml.runtime.instructions.spark.functions.FilterDiagBlocksFunction;
+import org.apache.sysml.runtime.instructions.spark.functions.FilterNonEmptyBlocksFunction;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -91,6 +92,9 @@ public class AggregateUnarySPInstruction extends UnarySPInstruction {
 		//perform aggregation if necessary and put output into symbol table
 		if( _aggtype == SparkAggType.SINGLE_BLOCK )
 		{
+			if( auop.sparseSafe )
+				out = out.filter(new FilterNonEmptyBlocksFunction());
+			
 			JavaRDD<MatrixBlock> out2 = out.map(
 					new RDDUAggFunction2(auop, mc.getRowsPerBlock(), mc.getColsPerBlock()));
 			MatrixBlock out3 = RDDAggregateUtils.aggStable(out2, aggop);
@@ -111,7 +115,7 @@ public class AggregateUnarySPInstruction extends UnarySPInstruction {
 			}
 			else if( _aggtype == SparkAggType.MULTI_BLOCK ) {
 				//in case of multi-block aggregation, we always keep the correction
-				out = out.mapToPair(new RDDUAggFunction(auop, mc.getRowsPerBlock(), mc.getColsPerBlock()));			
+				out = out.mapToPair(new RDDUAggFunction(auop, mc.getRowsPerBlock(), mc.getColsPerBlock()));
 				out = RDDAggregateUtils.aggByKeyStable(out, aggop, false);
 	
 				//drop correction after aggregation if required (aggbykey creates 
@@ -124,7 +128,7 @@ public class AggregateUnarySPInstruction extends UnarySPInstruction {
 			updateUnaryAggOutputMatrixCharacteristics(sec, auop.indexFn);
 			sec.setRDDHandleForVariable(output.getName(), out);	
 			sec.addLineageRDD(output.getName(), input1.getName());
-		}		
+		}
 	}
 
 	private static class RDDUAggFunction implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock> 
@@ -164,7 +168,7 @@ public class AggregateUnarySPInstruction extends UnarySPInstruction {
 	/**
 	 * Similar to RDDUAggFunction but single output block.
 	 */
-	private static class RDDUAggFunction2 implements Function<Tuple2<MatrixIndexes, MatrixBlock>, MatrixBlock> 
+	public static class RDDUAggFunction2 implements Function<Tuple2<MatrixIndexes, MatrixBlock>, MatrixBlock> 
 	{
 		private static final long serialVersionUID = 2672082409287856038L;
 		


[2/6] systemml git commit: [SYSTEMML-2280] Performance ultra-sparse/empty block serialization

Posted by mb...@apache.org.
[SYSTEMML-2280] Performance ultra-sparse/empty block serialization

This patch improves the performance and memory-efficiency of ultra-spark
and empty block serialization, especially for distributed shuffle
operations. In detail, we now refrain from creating custom serialization
buffers (with 8KB mem buffers) for these tiny blocks, which helps reduce
GC overhead. Furthermore, we added a new serialization code path for
sparse blocks in COO format, which changes the serialization complexity
from O(rlen+nnz) to O(nnz).


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/0bd08f29
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/0bd08f29
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/0bd08f29

Branch: refs/heads/master
Commit: 0bd08f29da6973d48d9d31fa6cf9eb6309b8af73
Parents: 18cc576
Author: Matthias Boehm <mb...@gmail.com>
Authored: Thu Apr 26 20:45:28 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Apr 27 00:03:14 2018 -0700

----------------------------------------------------------------------
 .../spark/data/CorrMatrixBlock.java             |  8 ++--
 .../sysml/runtime/matrix/data/MatrixBlock.java  | 41 ++++++++++++++------
 2 files changed, 35 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/0bd08f29/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java
index 60ad927..574839b 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java
@@ -101,7 +101,7 @@ public class CorrMatrixBlock implements Externalizable
 	public void writeExternal(ObjectOutput os) 
 		throws IOException
 	{
-		if( os instanceof ObjectOutputStream ) {
+		if( os instanceof ObjectOutputStream && !_value.isEmptyBlock(false) ) {
 			//fast serialize of dense/sparse blocks
 			ObjectOutputStream oos = (ObjectOutputStream)os;
 			FastBufferedDataOutputStream fos = new FastBufferedDataOutputStream(oos);
@@ -117,9 +117,11 @@ public class CorrMatrixBlock implements Externalizable
 	private void writeHeaderAndPayload(DataOutput dos) 
 		throws IOException 
 	{
-		dos.writeByte((_corr!=null)?1:0);
+		boolean writeCorr = (_corr != null
+			&& !_corr.isEmptyBlock(false));
+		dos.writeByte(writeCorr ? 1 : 0);
 		_value.write(dos);
-		if( _corr!=null )
+		if( writeCorr )
 			_corr.write(dos);
 	}
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/0bd08f29/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 9e032b6..51084ef 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -373,7 +373,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 		return reset;
 	}
 	
-	public void allocateAndResetSparseRowsBlock(boolean clearNNZ, SparseBlock.Type stype)
+	public void allocateAndResetSparseBlock(boolean clearNNZ, SparseBlock.Type stype)
 	{
 		//allocate block if non-existing or too small (guaranteed to be 0-initialized)
 		if( sparseBlock == null || sparseBlock.numRows()<rlen
@@ -1874,7 +1874,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 		//and to allow efficient reset without repeated sparse row allocation
 		
 		//adjust size and ensure reuse block is in CSR format
-		allocateAndResetSparseRowsBlock(false, SparseBlock.Type.CSR);
+		allocateAndResetSparseBlock(false, SparseBlock.Type.CSR);
 		
 		if( clen > 1 ) { //ULTRA-SPARSE BLOCK
 			//block: read ijv-triples (ordered by row and column) via custom 
@@ -2026,14 +2026,26 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 		if( clen > 1 ) //ULTRA-SPARSE BLOCK
 		{
 			//block: write ijv-triples
-			for(int r=0;r<Math.min(rlen, sparseBlock.numRows()); r++)
-				if( !sparseBlock.isEmpty(r) )
-				{
+			if( sparseBlock instanceof SparseBlockCOO ) {
+				SparseBlockCOO sblock = (SparseBlockCOO)sparseBlock;
+				int[] rix = sblock.rowIndexes();
+				int[] cix = sblock.indexes();
+				double[] vals = sblock.values();
+				for(int i=0; i<sblock.size(); i++) {
+					//ultra-sparse block: write ijv-triples
+					out.writeInt(rix[i]);
+					out.writeInt(cix[i]);
+					out.writeDouble(vals[i]);
+					wnnz++;
+				}
+			}
+			else {
+				for(int r=0;r<Math.min(rlen, sparseBlock.numRows()); r++) {
+					if( sparseBlock.isEmpty(r) ) continue;
 					int apos = sparseBlock.pos(r);
 					int alen = sparseBlock.size(r);
 					int[] aix = sparseBlock.indexes(r);
 					double[] avals = sparseBlock.values(r);
-					
 					for(int j=apos; j<apos+alen; j++) {
 						//ultra-sparse block: write ijv-triples
 						out.writeInt(r);
@@ -2041,7 +2053,8 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 						out.writeDouble(avals[j]);
 						wnnz++;
 					}
-				}	
+				}
+			}
 		}
 		else //ULTRA-SPARSE COL
 		{
@@ -2209,7 +2222,8 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 	public void readExternal(ObjectInput is) 
 		throws IOException
 	{
-		if( is instanceof ObjectInputStream )
+		if( is instanceof ObjectInputStream
+			&& !(is instanceof MatrixBlockDataInput) )
 		{
 			//fast deserialize of dense/sparse blocks
 			ObjectInputStream ois = (ObjectInputStream)is;
@@ -2232,7 +2246,12 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 	public void writeExternal(ObjectOutput os) 
 		throws IOException
 	{
-		if( os instanceof ObjectOutputStream ) {
+		//note: in case of a CorrMatrixBlock being wrapped around a matrix
+		//block, the object output is already a FastBufferedDataOutputStream;
+		//so in general we try to avoid unnecessary buffer allocations here.
+		
+		if( os instanceof ObjectOutputStream && !isEmptyBlock(false)
+			&& !(os instanceof MatrixBlockDataOutput) ) {
 			//fast serialize of dense/sparse blocks
 			ObjectOutputStream oos = (ObjectOutputStream)os;
 			FastBufferedDataOutputStream fos = new FastBufferedDataOutputStream(oos);
@@ -2828,7 +2847,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 	}
 	
 	@Override
-	public void incrementalAggregate(AggregateOperator aggOp, MatrixValue correction, MatrixValue newWithCorrection) {
+	public void incrementalAggregate(AggregateOperator aggOp, MatrixValue correction, MatrixValue newWithCorrection, boolean deep) {
 		//assert(aggOp.correctionExists); 
 		MatrixBlock cor=checkType(correction);
 		MatrixBlock newWithCor=checkType(newWithCorrection);
@@ -2900,7 +2919,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 		{
 			//e.g., ak+ kahan plus as used in sum, mapmult, mmcj and tsmm
 			if(aggOp.increOp.fn instanceof KahanPlus) {
-				LibMatrixAgg.aggregateBinaryMatrix(newWithCor, this, cor);
+				LibMatrixAgg.aggregateBinaryMatrix(newWithCor, this, cor, deep);
 			}
 			else
 			{


[6/6] systemml git commit: [SYSTEMML-2283] Fix performance issue CSE on DAGs w/ many root nodes

Posted by mb...@apache.org.
[SYSTEMML-2283] Fix performance issue CSE on DAGs w/ many root nodes

This patch fixes a severe performance issue of the compiler rewrite for
common subexpression elimination (CSE) for the case of DAGs with many
root nodes. The issues showed up on resnet 200 because this script
contains DAGs with 2000+ root nodes. In detail, the issue was due to
incorrect reset-merge of root nodes which led to (predicated on the
graph structure) in the worst-case to processing the entire DAG times
the number of root nodes. This patch fixes the issue and generally
improves performance by using better keys for literal ops (value type
and name) to avoid unnecessary string concatenation. Together, these
changes improved the compilation time of resnet200 from 22min to <10s.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/686e3831
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/686e3831
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/686e3831

Branch: refs/heads/master
Commit: 686e3831dee5bb87833d0192a098c97b13fe1c20
Parents: b3fef52
Author: Matthias Boehm <mb...@gmail.com>
Authored: Thu Apr 26 23:46:39 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Apr 27 00:03:21 2018 -0700

----------------------------------------------------------------------
 .../RewriteCommonSubexpressionElimination.java  | 120 ++++++++++---------
 1 file changed, 66 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/686e3831/src/main/java/org/apache/sysml/hops/rewrite/RewriteCommonSubexpressionElimination.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/rewrite/RewriteCommonSubexpressionElimination.java b/src/main/java/org/apache/sysml/hops/rewrite/RewriteCommonSubexpressionElimination.java
index f0cc46e..f359422 100644
--- a/src/main/java/org/apache/sysml/hops/rewrite/RewriteCommonSubexpressionElimination.java
+++ b/src/main/java/org/apache/sysml/hops/rewrite/RewriteCommonSubexpressionElimination.java
@@ -25,7 +25,9 @@ import java.util.HashMap;
 import org.apache.sysml.hops.DataOp;
 import org.apache.sysml.hops.Hop;
 import org.apache.sysml.hops.LiteralOp;
+import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.hops.Hop.DataOpTypes;
+import org.apache.sysml.runtime.util.UtilFunctions;
 
 /**
  * Rule: CommonSubexpressionElimination. For all statement blocks, 
@@ -36,16 +38,13 @@ import org.apache.sysml.hops.Hop.DataOpTypes;
  */
 public class RewriteCommonSubexpressionElimination extends HopRewriteRule
 {
+	private final boolean _mergeLeafs;
 	
-	private boolean _mergeLeafs = true;
-	
-	public RewriteCommonSubexpressionElimination()
-	{
+	public RewriteCommonSubexpressionElimination() {
 		this( true ); //default full CSE
 	}
 	
-	public RewriteCommonSubexpressionElimination( boolean mergeLeafs )
-	{
+	public RewriteCommonSubexpressionElimination( boolean mergeLeafs ) {
 		_mergeLeafs = mergeLeafs;
 	}
 	
@@ -55,21 +54,23 @@ public class RewriteCommonSubexpressionElimination extends HopRewriteRule
 		if( roots == null )
 			return null;
 		
-		HashMap<String, Hop> dataops = new HashMap<>();
-		HashMap<String, Hop> literalops = new HashMap<>(); //key: <VALUETYPE>_<LITERAL>
-		for (Hop h : roots) 
-		{
-			int cseMerged = 0;
-			if( _mergeLeafs ) {
+		//CSE pass 1: merge leaf nodes by name
+		int cseMerged = 0;
+		if( _mergeLeafs ) {
+			HashMap<String, Hop> dataops = new HashMap<>();
+			HashMap<LiteralKey, Hop> literalops = new HashMap<>();
+			for (Hop h : roots)
 				cseMerged += rule_CommonSubexpressionElimination_MergeLeafs(h, dataops, literalops);
-				h.resetVisitStatus();
-			}
-			cseMerged += rule_CommonSubexpressionElimination(h);
-				
-			if( cseMerged > 0 )
-				LOG.debug("Common Subexpression Elimination - removed "+cseMerged+" operators.");
+			Hop.resetVisitStatus(roots);
 		}
 		
+		//CSE pass 2: bottom-up merge of inner nodes
+		for (Hop h : roots) 
+			cseMerged += rule_CommonSubexpressionElimination(h);
+		
+		if( cseMerged > 0 )
+			LOG.debug("Common Subexpression Elimination - removed "+cseMerged+" operators.");
+		
 		return roots;
 	}
 
@@ -79,13 +80,16 @@ public class RewriteCommonSubexpressionElimination extends HopRewriteRule
 		if( root == null )
 			return null;
 		
-		HashMap<String, Hop> dataops = new HashMap<>();
-		HashMap<String, Hop> literalops = new HashMap<>(); //key: <VALUETYPE>_<LITERAL>
+		//CSE pass 1: merge leaf nodes by name
 		int cseMerged = 0;
 		if( _mergeLeafs ) {
+			HashMap<String, Hop> dataops = new HashMap<>();
+			HashMap<LiteralKey, Hop> literalops = new HashMap<>();
 			cseMerged += rule_CommonSubexpressionElimination_MergeLeafs(root, dataops, literalops);
 			root.resetVisitStatus();
 		}
+		
+		//CSE pass 2: bottom-up merge of inner nodes
 		cseMerged += rule_CommonSubexpressionElimination(root);
 		
 		if( cseMerged > 0 )
@@ -94,25 +98,24 @@ public class RewriteCommonSubexpressionElimination extends HopRewriteRule
 		return root;
 	}
 	
-	private int rule_CommonSubexpressionElimination_MergeLeafs( Hop hop, HashMap<String, Hop> dataops, HashMap<String, Hop> literalops ) 
+	private int rule_CommonSubexpressionElimination_MergeLeafs( Hop hop,
+		HashMap<String, Hop> dataops, HashMap<LiteralKey, Hop> literalops ) 
 	{
-		int ret = 0;
 		if( hop.isVisited() )
-			return ret;
-
+			return 0;
+		
+		int ret = 0;
 		if( hop.getInput().isEmpty() //LEAF NODE
 			|| HopRewriteUtils.isData(hop, DataOpTypes.TRANSIENTREAD) )
 		{
-			if( hop instanceof LiteralOp )
-			{
-				String key = hop.getValueType()+"_"+hop.getName();
+			if( hop instanceof LiteralOp ) {
+				LiteralKey key = new LiteralKey(hop.getValueType(), hop.getName());
 				if( !literalops.containsKey(key) )
 					literalops.put(key, hop);
 			}
-			else if( hop instanceof DataOp && ((DataOp)hop).isRead())
-			{
-				if(!dataops.containsKey(hop.getName()) )
-					dataops.put(hop.getName(), hop);
+			else if( hop instanceof DataOp && ((DataOp)hop).isRead()
+				&& !dataops.containsKey(hop.getName())) {
+				dataops.put(hop.getName(), hop);
 			} 
 		}
 		else //INNER NODE
@@ -121,10 +124,8 @@ public class RewriteCommonSubexpressionElimination extends HopRewriteRule
 			for( int i=0; i<hop.getInput().size(); i++ )
 			{
 				Hop hi = hop.getInput().get(i);
-				String litKey = hi.getValueType()+"_"+hi.getName();
-				if( hi instanceof DataOp && ((DataOp)hi).isRead() && dataops.containsKey(hi.getName()) )
-				{
-					
+				LiteralKey litKey = new LiteralKey(hi.getValueType(), hi.getName());
+				if( hi instanceof DataOp && ((DataOp)hi).isRead() && dataops.containsKey(hi.getName()) ) {
 					//replace child node ref
 					Hop tmp = dataops.get(hi.getName());
 					if( tmp != hi ) { //if required
@@ -134,10 +135,8 @@ public class RewriteCommonSubexpressionElimination extends HopRewriteRule
 						ret++;
 					}
 				}
-				else if( hi instanceof LiteralOp && literalops.containsKey(litKey) )
-				{
+				else if( hi instanceof LiteralOp && literalops.containsKey(litKey) ) {
 					Hop tmp = literalops.get(litKey);
-					
 					//replace child node ref
 					if( tmp != hi ){ //if required
 						tmp.getParent().add(hop);
@@ -148,37 +147,33 @@ public class RewriteCommonSubexpressionElimination extends HopRewriteRule
 				}
 				
 				//recursive invocation (direct return on merged nodes)
-				ret += rule_CommonSubexpressionElimination_MergeLeafs(hi, dataops, literalops);		
-			}	
+				ret += rule_CommonSubexpressionElimination_MergeLeafs(hi, dataops, literalops);
+			}
 		}
-		
 		hop.setVisited();
 		return ret;
 	}
 
 	private int rule_CommonSubexpressionElimination( Hop hop ) 
 	{
-		int ret = 0;
 		if( hop.isVisited() )
-			return ret;
-
+			return 0;
+		
 		//step 1: merge childs recursively first
+		int ret = 0;
 		for(Hop hi : hop.getInput())
 			ret += rule_CommonSubexpressionElimination(hi);	
 		
-		
 		//step 2: merge parent nodes
 		if( hop.getParent().size()>1 ) //multiple consumers
 		{
 			//for all pairs 
 			for( int i=0; i<hop.getParent().size()-1; i++ )
-				for( int j=i+1; j<hop.getParent().size(); j++ )
-				{
+				for( int j=i+1; j<hop.getParent().size(); j++ ) {
 					Hop h1 = hop.getParent().get(i);
 					Hop h2 = hop.getParent().get(j);
 					
-					if( h1==h2 )
-					{
+					if( h1==h2 ) {
 						//do nothing, note: we should not remove redundant parent links
 						//(otherwise rewrites would need to take this property into account) 
 						
@@ -186,8 +181,7 @@ public class RewriteCommonSubexpressionElimination extends HopRewriteRule
 						//hop.getParent().remove(j);
 						//j--;
 					}
-					else if( h1.compare(h2) ) //merge h2 into h1
-					{
+					else if( h1.compare(h2) ) { //merge h2 into h1
 						//remove h2 from parent list
 						hop.getParent().remove(j);
 						
@@ -195,8 +189,7 @@ public class RewriteCommonSubexpressionElimination extends HopRewriteRule
 						ArrayList<Hop> parent = h2.getParent();
 						for( Hop p : parent )
 							for( int k=0; k<p.getInput().size(); k++ )
-								if( p.getInput().get(k)==h2 )
-								{
+								if( p.getInput().get(k)==h2 ) {
 									p.getInput().set(k, h1);
 									h1.getParent().add(p);
 									h1.setVisited();
@@ -213,7 +206,26 @@ public class RewriteCommonSubexpressionElimination extends HopRewriteRule
 		}
 		
 		hop.setVisited();
-
 		return ret;
 	}
+	
+	protected static class LiteralKey {
+		private final int _vtType;
+		private final String _name;
+		
+		public LiteralKey(ValueType vt, String name) {
+			_vtType = vt.ordinal();
+			_name = name;
+		}
+		@Override
+		public int hashCode() {
+			return UtilFunctions.longHashCode(_vtType, _name.hashCode());
+		}
+		@Override 
+		public boolean equals(Object o) {
+			return (o instanceof LiteralKey
+				&& _vtType == ((LiteralKey)o)._vtType
+				&& _name.equals(((LiteralKey)o)._name));
+		}
+	}
 }


[4/6] systemml git commit: [SYSTEMML-2282] Memory efficiency spark empty block injection

Posted by mb...@apache.org.
[SYSTEMML-2282] Memory efficiency spark empty block injection 

This patch improves the memory efficiency of spark empty block injection
to reduce GC overheads. For the scenario of creating partitions with all
empty blocks of a matrix (for union with non-zero block reblock), the
number of blocks is often very large (>3M per partition). Hence, we now
use more conservative partition sizes of 32 instead of 128MB as well as
lazy iterators when creating the blocks for a single offset (i.e., for a
single partition).


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/7cb43ddd
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/7cb43ddd
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/7cb43ddd

Branch: refs/heads/master
Commit: 7cb43dddda45e5e6ceae5371b9bc15b28e72ac63
Parents: 3b359c3
Author: Matthias Boehm <mb...@gmail.com>
Authored: Thu Apr 26 21:05:31 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Apr 27 00:03:18 2018 -0700

----------------------------------------------------------------------
 .../instructions/spark/utils/SparkUtils.java     | 19 ++++++++-----------
 1 file changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/7cb43ddd/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
index 49232da..952135e 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
@@ -20,7 +20,6 @@
 
 package org.apache.sysml.runtime.instructions.spark.utils;
 
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -188,7 +187,7 @@ public class SparkUtils
 		//compute degree of parallelism and block ranges
 		long size = mc.getNumBlocks() * OptimizerUtils.estimateSizeEmptyBlock(Math.min(
 				Math.max(mc.getRows(),1), mc.getRowsPerBlock()), Math.min(Math.max(mc.getCols(),1), mc.getColsPerBlock()));
-		int par = (int) Math.min(Math.max(SparkExecutionContext.getDefaultParallelism(true),
+		int par = (int) Math.min(4*Math.max(SparkExecutionContext.getDefaultParallelism(true),
 				Math.ceil(size/InfrastructureAnalyzer.getHDFSBlockSize())), mc.getNumBlocks());
 		long pNumBlocks = (long)Math.ceil((double)mc.getNumBlocks()/par);
 		
@@ -273,21 +272,19 @@ public class SparkUtils
 		}
 		
 		@Override
-		public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Long arg0) 
-			throws Exception 
-		{
-			ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> list = new ArrayList<>();
+		public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Long arg0) throws Exception {
+			//NOTE: for cases of a very large number of empty blocks per partition
+			//(e.g., >3M for 128MB partitions), it is important for low GC overhead 
+			//not to materialized these objects but return a lazy iterator instead.
 			long ncblks = _mc.getNumColBlocks();
 			long nblocksU = Math.min(arg0+_pNumBlocks, _mc.getNumBlocks());
-			for( long i=arg0; i<nblocksU; i++ ) {
+			return LongStream.range(arg0, nblocksU).mapToObj(i -> {
 				long rix = 1 + i / ncblks;
 				long cix = 1 + i % ncblks;
 				int lrlen = UtilFunctions.computeBlockSize(_mc.getRows(), rix, _mc.getRowsPerBlock());
 				int lclen = UtilFunctions.computeBlockSize(_mc.getCols(), cix, _mc.getColsPerBlock());
-				list.add(new Tuple2<>(new MatrixIndexes(rix,cix), 
-					new MatrixBlock(lrlen, lclen, true)));
-			}
-			return list.iterator();
+				return new Tuple2<>(new MatrixIndexes(rix,cix), new MatrixBlock(lrlen, lclen, true));
+			}).iterator();
 		}
 	}
 }


[3/6] systemml git commit: [SYSTEMML-2281] Performance spark sumByKey incr block aggregation

Posted by mb...@apache.org.
[SYSTEMML-2281] Performance spark sumByKey incr block aggregation

This patch improves the performance of the very common spark sumByKey
primitives as used for many matrix multiplications and other operations
with global aggregation. We now avoid the unnecessary creation of dense
correction blocks, which greatly reduces GC overhead for ultra-sparse
scenarios.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/3b359c39
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/3b359c39
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/3b359c39

Branch: refs/heads/master
Commit: 3b359c39029e26cd188fd64f370d00eb102adcf8
Parents: 0bd08f2
Author: Matthias Boehm <mb...@gmail.com>
Authored: Thu Apr 26 20:59:28 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Apr 27 00:03:15 2018 -0700

----------------------------------------------------------------------
 .../spark/utils/RDDAggregateUtils.java          | 54 +++++++++++++++-----
 .../sysml/runtime/io/ReaderBinaryBlock.java     |  2 +-
 .../sysml/runtime/matrix/data/CM_N_COVCell.java |  2 +-
 .../sysml/runtime/matrix/data/LibMatrixAgg.java | 25 +++++----
 .../sysml/runtime/matrix/data/MatrixCell.java   |  2 +-
 .../sysml/runtime/matrix/data/MatrixValue.java  |  9 ++--
 .../matrix/data/OperationsOnMatrixValues.java   | 10 +++-
 7 files changed, 69 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/3b359c39/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
index 0101e26..23b6ad9 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
@@ -26,13 +26,16 @@ import org.apache.spark.api.java.function.Function2;
 import org.apache.sysml.lops.PartialAggregate.CorrectionLocationType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.functionobjects.KahanPlus;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.instructions.spark.AggregateUnarySPInstruction.RDDUAggFunction2;
 import org.apache.sysml.runtime.instructions.spark.data.CorrMatrixBlock;
 import org.apache.sysml.runtime.instructions.spark.data.RowMatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
 import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
+import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
 
 /**
  * Collection of utility methods for aggregating binary block rdds. As a general
@@ -82,25 +85,30 @@ public class RDDAggregateUtils
 	{
 		//stable sum of blocks per key, by passing correction blocks along with aggregates
 		JavaPairRDD<MatrixIndexes, CorrMatrixBlock> tmp = 
-				in.combineByKey( new CreateCorrBlockCombinerFunction(deepCopyCombiner), 
-							     new MergeSumBlockValueFunction(), 
-							     new MergeSumBlockCombinerFunction(), numPartitions );
+			in.combineByKey( new CreateCorrBlockCombinerFunction(deepCopyCombiner), 
+				new MergeSumBlockValueFunction(deepCopyCombiner),
+				new MergeSumBlockCombinerFunction(deepCopyCombiner), numPartitions );
 		
 		//strip-off correction blocks from
 		JavaPairRDD<MatrixIndexes, MatrixBlock> out =
-				tmp.mapValues( new ExtractMatrixBlock() );
+			tmp.mapValues( new ExtractMatrixBlock() );
 		
 		//return the aggregate rdd
 		return out;
 	}
 
-	public static JavaPairRDD<MatrixIndexes, Double> sumCellsByKeyStable( JavaPairRDD<MatrixIndexes, Double> in )
+	
+	public static JavaPairRDD<MatrixIndexes, Double> sumCellsByKeyStable( JavaPairRDD<MatrixIndexes, Double> in ) {
+		return sumCellsByKeyStable(in, in.getNumPartitions());
+	}
+	
+	public static JavaPairRDD<MatrixIndexes, Double> sumCellsByKeyStable( JavaPairRDD<MatrixIndexes, Double> in, int numParts )
 	{
 		//stable sum of blocks per key, by passing correction blocks along with aggregates
-		JavaPairRDD<MatrixIndexes, KahanObject> tmp = 
-				in.combineByKey( new CreateCellCombinerFunction(), 
-							     new MergeSumCellValueFunction(), 
-							     new MergeSumCellCombinerFunction() );
+		JavaPairRDD<MatrixIndexes, KahanObject> tmp =
+				in.combineByKey( new CreateCellCombinerFunction(),
+					new MergeSumCellValueFunction(), 
+					new MergeSumCellCombinerFunction(), numParts);
 		
 		//strip-off correction blocks from
 		JavaPairRDD<MatrixIndexes, Double> out =
@@ -166,6 +174,12 @@ public class RDDAggregateUtils
 		return out;
 	}
 	
+	public static double max(JavaPairRDD<MatrixIndexes, MatrixBlock> in) {
+		AggregateUnaryOperator auop = InstructionUtils.parseBasicAggregateUnaryOperator("uamax");
+		MatrixBlock tmp = aggStable(in.map(new RDDUAggFunction2(auop, -1, -1)), auop.aggOp);
+		return tmp.quickGetValue(0, 0);
+	}
+	
 	/**
 	 * Merges disjoint data of all blocks per key.
 	 * 
@@ -258,6 +272,12 @@ public class RDDAggregateUtils
 		
 		private AggregateOperator _op = new AggregateOperator(0, KahanPlus.getKahanPlusFnObject(), true, CorrectionLocationType.NONE);
 		
+		private final boolean _deep;
+
+		public MergeSumBlockValueFunction(boolean deep) {
+			_deep = deep;
+		}
+		
 		@Override
 		public CorrMatrixBlock call(CorrMatrixBlock arg0, MatrixBlock arg1) 
 			throws Exception 
@@ -270,12 +290,12 @@ public class RDDAggregateUtils
 			MatrixBlock corr = arg0.getCorrection();
 			
 			//correction block allocation on demand
-			if( corr == null )
+			if( corr == null && !arg1.isEmptyBlock(false) )
 				corr = new MatrixBlock(value.getNumRows(), value.getNumColumns(), false);
 			
 			//aggregate other input and maintain corrections 
 			//(existing value and corr are used in place)
-			OperationsOnMatrixValues.incrementalAggregation(value, corr, arg1, _op, false);
+			OperationsOnMatrixValues.incrementalAggregation(value, corr, arg1, _op, false, _deep);
 			return arg0.set(value, corr);
 		}
 	}
@@ -285,6 +305,11 @@ public class RDDAggregateUtils
 		private static final long serialVersionUID = 7664941774566119853L;
 		
 		private AggregateOperator _op = new AggregateOperator(0, KahanPlus.getKahanPlusFnObject(), true, CorrectionLocationType.NONE);	
+		private final boolean _deep;
+
+		public MergeSumBlockCombinerFunction(boolean deep) {
+			_deep = deep;
+		}
 		
 		@Override
 		public CorrMatrixBlock call(CorrMatrixBlock arg0, CorrMatrixBlock arg1) 
@@ -297,15 +322,16 @@ public class RDDAggregateUtils
 			
 			//correction block allocation on demand (but use second if exists)
 			if( corr == null ) {
-				corr = (arg1.getCorrection()!=null)?arg1.getCorrection():
+				corr = (arg1.getCorrection()!=null) ? arg1.getCorrection() :
+					value2.isEmptyBlock(false) || (!_deep && value1.isEmptyBlock(false)) ? null :
 					new MatrixBlock(value1.getNumRows(), value1.getNumColumns(), false);
 			}
 			
 			//aggregate other input and maintain corrections
 			//(existing value and corr are used in place)
-			OperationsOnMatrixValues.incrementalAggregation(value1, corr, value2, _op, false);
+			OperationsOnMatrixValues.incrementalAggregation(value1, corr, value2, _op, false, _deep);
 			return arg0.set(value1, corr);
-		}	
+		}
 	}
 
 	private static class CreateBlockCombinerFunction implements Function<MatrixBlock, MatrixBlock> 

http://git-wip-us.apache.org/repos/asf/systemml/blob/3b359c39/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
index 9461ca1..f7a5147 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
@@ -113,7 +113,7 @@ public class ReaderBinaryBlock extends MatrixReader
 		//where ultra-sparse deserialization only reuses CSR blocks
 		MatrixBlock value = new MatrixBlock(brlen, bclen, sparse);
 		if( sparse ) {
-			value.allocateAndResetSparseRowsBlock(true, SparseBlock.Type.CSR);
+			value.allocateAndResetSparseBlock(true, SparseBlock.Type.CSR);
 			value.getSparseBlock().allocate(0, brlen*bclen);
 		}
 		return value;

http://git-wip-us.apache.org/repos/asf/systemml/blob/3b359c39/src/main/java/org/apache/sysml/runtime/matrix/data/CM_N_COVCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/CM_N_COVCell.java b/src/main/java/org/apache/sysml/runtime/matrix/data/CM_N_COVCell.java
index a2b17ec..10956ad 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/CM_N_COVCell.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/CM_N_COVCell.java
@@ -98,7 +98,7 @@ public class CM_N_COVCell extends MatrixValue implements WritableComparable
 
 	@Override
 	public void incrementalAggregate(AggregateOperator aggOp,
-			MatrixValue correction, MatrixValue newWithCorrection) {
+			MatrixValue correction, MatrixValue newWithCorrection, boolean deep) {
 		throw new RuntimeException("operation not supported for CM_N_COVCell");
 	}
 	

http://git-wip-us.apache.org/repos/asf/systemml/blob/3b359c39/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
index 8de89c3..5dfddbd 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
@@ -122,12 +122,23 @@ public class LibMatrixAgg
 	 * @param in input matrix
 	 * @param aggVal current aggregate values (in/out)
 	 * @param aggCorr current aggregate correction (in/out)
+	 * @param deep deep copy flag
 	 */
-	public static void aggregateBinaryMatrix(MatrixBlock in, MatrixBlock aggVal, MatrixBlock aggCorr) {
+	public static void aggregateBinaryMatrix(MatrixBlock in, MatrixBlock aggVal, MatrixBlock aggCorr, boolean deep) {
 		//Timing time = new Timing(true);
 		//boolean saggVal = aggVal.sparse, saggCorr = aggCorr.sparse;
 		//long naggVal = aggVal.nonZeros, naggCorr = aggCorr.nonZeros;
 		
+		//common empty block handling
+		if( in.isEmptyBlock(false) ) {
+			return;
+		}
+		if( !deep && aggVal.isEmptyBlock(false) ) {
+			//shallow copy without correction allocation
+			aggVal.copyShallow(in);
+			return;
+		}
+		
 		//ensure MCSR instead of CSR for update in-place
 		if( aggVal.sparse && aggVal.isAllocated() && aggVal.getSparseBlock() instanceof SparseBlockCSR )
 			aggVal.sparseBlock = SparseBlockFactory.copySparseBlock(SparseBlock.Type.MCSR, aggVal.getSparseBlock(), true);
@@ -977,9 +988,6 @@ public class LibMatrixAgg
 	}
 
 	private static void aggregateBinaryMatrixAllDense(MatrixBlock in, MatrixBlock aggVal, MatrixBlock aggCorr) {
-		if( in.denseBlock==null || in.isEmptyBlock(false) )
-			return;
-		
 		//allocate output arrays (if required)
 		aggVal.allocateDenseBlock(); //should always stay in dense
 		aggCorr.allocateDenseBlock(); //should always stay in dense
@@ -1011,9 +1019,6 @@ public class LibMatrixAgg
 	}
 
 	private static void aggregateBinaryMatrixSparseDense(MatrixBlock in, MatrixBlock aggVal, MatrixBlock aggCorr) {
-		if( in.isEmptyBlock(false) )
-			return;
-		
 		//allocate output arrays (if required)
 		aggVal.allocateDenseBlock(); //should always stay in dense
 		aggCorr.allocateDenseBlock(); //should always stay in dense
@@ -1055,9 +1060,6 @@ public class LibMatrixAgg
 	}
 
 	private static void aggregateBinaryMatrixSparseGeneric(MatrixBlock in, MatrixBlock aggVal, MatrixBlock aggCorr) {
-		if( in.isEmptyBlock(false) )
-			return;
-		
 		SparseBlock a = in.getSparseBlock();
 		
 		KahanObject buffer1 = new KahanObject(0, 0);
@@ -1095,9 +1097,6 @@ public class LibMatrixAgg
 	}
 
 	private static void aggregateBinaryMatrixDenseGeneric(MatrixBlock in, MatrixBlock aggVal, MatrixBlock aggCorr) {
-		if( in.denseBlock==null || in.isEmptyBlock(false) )
-			return;
-		
 		final int m = in.rlen;
 		final int n = in.clen;
 		

http://git-wip-us.apache.org/repos/asf/systemml/blob/3b359c39/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixCell.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixCell.java
index aa1a01d..d51fd74 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixCell.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixCell.java
@@ -266,7 +266,7 @@ public class MatrixCell extends MatrixValue implements WritableComparable, Seria
 
 	@Override
 	public void incrementalAggregate(AggregateOperator aggOp,
-			MatrixValue correction, MatrixValue newWithCorrection) {
+			MatrixValue correction, MatrixValue newWithCorrection, boolean deep) {
 		throw new DMLRuntimeException("MatrixCell.incrementalAggregate should never be called");
 	}
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/3b359c39/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixValue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixValue.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixValue.java
index 05f0634..82b09e0 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixValue.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixValue.java
@@ -126,9 +126,12 @@ public abstract class MatrixValue implements WritableComparable
 			int blockingFactorRow, int blockingFactorCol, MatrixIndexes indexesIn, boolean inCP);
 	
 	public abstract MatrixValue unaryOperations(UnaryOperator op, MatrixValue result);
-
-	public abstract void incrementalAggregate(AggregateOperator aggOp, MatrixValue correction, 
-			MatrixValue newWithCorrection);
+	
+	public void incrementalAggregate(AggregateOperator aggOp, MatrixValue correction, MatrixValue newWithCorrection) {
+		incrementalAggregate(aggOp, correction, newWithCorrection, true);
+	}
+	
+	public abstract void incrementalAggregate(AggregateOperator aggOp, MatrixValue correction, MatrixValue newWithCorrection, boolean deep);
 	
 	public abstract void incrementalAggregate(AggregateOperator aggOp, MatrixValue newWithCorrection);
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/3b359c39/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
index 0e77b8e..bc4e969 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
@@ -196,12 +196,18 @@ public class OperationsOnMatrixValues
 	}
 	
 	public static void incrementalAggregation(MatrixValue valueAgg, MatrixValue correction, MatrixValue valueAdd, 
-			AggregateOperator op, boolean imbededCorrection)
+			AggregateOperator op, boolean imbededCorrection) {
+		incrementalAggregation(valueAgg, correction, valueAdd, op, imbededCorrection, true);
+	}
+	
+	
+	public static void incrementalAggregation(MatrixValue valueAgg, MatrixValue correction, MatrixValue valueAdd, 
+			AggregateOperator op, boolean imbededCorrection, boolean deep)
 	{
 		if(op.correctionExists)
 		{
 			if(!imbededCorrection || op.correctionLocation==CorrectionLocationType.NONE)
-				valueAgg.incrementalAggregate(op, correction, valueAdd);
+				valueAgg.incrementalAggregate(op, correction, valueAdd, deep);
 			else
 				valueAgg.incrementalAggregate(op, valueAdd);
 		}


[5/6] systemml git commit: [SYSTEMML-2279] Performance spark ctable (1-pass, fused reblock)

Posted by mb...@apache.org.
[SYSTEMML-2279] Performance spark ctable (1-pass, fused reblock)

This patch significantly improves the performance of the spark ctable
instruction. So far, we constructed double outputs, converted them to
cell, aggregated the cells, determined dimensions, and finally used a
reblock to bring the output into binary block format. For large and
ultra-sparse matrices this caused a redundant pass over the expensive
cell conversion and lots of unnecessary shuffling.

Instead, we now scan the inputs to determine the output dimensions if
necessary, locally aggregate all cells of a partition and directly
output non-zero blocks which feed together with injected empty blocks
(via union) into a fused reblock with global aggregation. In addition,
this also includes numerous smaller improvements to utilize the existing
cluster parallelism and improve memory efficiency and thus reduce the
garbage collection overhead.

On a scenario of sum(table(seq(1,1e9),1+seq(1,1e9)/1000)) with disabled
rewrites and together with the changes from SYSTEMML-2279 through 2282,
this patch improved end-to-end the runtime from 3163s to 455s.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/b3fef523
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/b3fef523
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/b3fef523

Branch: refs/heads/master
Commit: b3fef523c1057c0c82935954210686317492607c
Parents: 7cb43dd
Author: Matthias Boehm <mb...@gmail.com>
Authored: Thu Apr 26 21:27:11 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Apr 27 00:03:20 2018 -0700

----------------------------------------------------------------------
 .../org/apache/sysml/hops/OptimizerUtils.java   |   4 +-
 .../java/org/apache/sysml/hops/TernaryOp.java   |  11 +-
 src/main/java/org/apache/sysml/lops/Ctable.java |  11 +-
 .../runtime/compress/CompressedMatrixBlock.java |   2 +-
 .../instructions/spark/CtableSPInstruction.java | 473 ++++++-------------
 .../spark/utils/RDDConverterUtils.java          |  11 +-
 .../spark/utils/RDDConverterUtilsExt.java       |   6 +-
 .../runtime/matrix/data/MatrixPackedCell.java   |   2 +-
 .../sysml/runtime/matrix/data/MatrixValue.java  |   4 -
 .../runtime/matrix/mapred/ReblockBuffer.java    |  15 +-
 .../sysml/runtime/util/UtilFunctions.java       |   8 +
 11 files changed, 177 insertions(+), 370 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
index e9af001..72f9b81 100644
--- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
@@ -686,7 +686,7 @@ public class OptimizerUtils
 				mc.getCols(), 
 				mc.getRowsPerBlock(), 
 				mc.getColsPerBlock(), 
-				mc.getNonZeros());
+				mc.getNonZerosBound());
 	}
 	
 	/**
@@ -725,7 +725,7 @@ public class OptimizerUtils
 		long tnrblks = (long)Math.ceil((double)rlen/brlen);
 		long tncblks = (long)Math.ceil((double)clen/bclen);
 		long nnz = (long) Math.ceil(sp * rlen * clen);
-		if( nnz < tnrblks * tncblks ) {
+		if( nnz <= tnrblks * tncblks ) {
 			long lrlen = Math.min(rlen, brlen);
 			long lclen = Math.min(clen, bclen);
 			return nnz * estimateSizeExactSparsity(lrlen, lclen, 1)

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/hops/TernaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/TernaryOp.java b/src/main/java/org/apache/sysml/hops/TernaryOp.java
index c7c7832..b6e62ff 100644
--- a/src/main/java/org/apache/sysml/hops/TernaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/TernaryOp.java
@@ -428,18 +428,11 @@ public class TernaryOp extends Hop
 			ternary.getOutputParameters().setDimensions(_dim1, _dim2, getRowsInBlock(), getColsInBlock(), -1);
 			setLineNumbers(ternary);
 			
-			//force blocked output in CP (see below), otherwise binarycell
-			if ( et == ExecType.SPARK ) {
-				ternary.getOutputParameters().setDimensions(_dim1, _dim2, -1, -1, -1);
-				setRequiresReblock( true );
-			}
-			else
-				ternary.getOutputParameters().setDimensions(_dim1, _dim2, getRowsInBlock(), getColsInBlock(), -1);
+			//force blocked output in CP and SPARK
+			ternary.getOutputParameters().setDimensions(_dim1, _dim2, getRowsInBlock(), getColsInBlock(), -1);
 			
 			//ternary opt, w/o reblock in CP
 			setLops(ternary);
-			
-			
 		}
 		else //MR
 		{

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/lops/Ctable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/Ctable.java b/src/main/java/org/apache/sysml/lops/Ctable.java
index ff9c720..127754e 100644
--- a/src/main/java/org/apache/sysml/lops/Ctable.java
+++ b/src/main/java/org/apache/sysml/lops/Ctable.java
@@ -42,7 +42,16 @@ public class Ctable extends Lop
 		CTABLE_TRANSFORM_HISTOGRAM, 
 		CTABLE_TRANSFORM_WEIGHTED_HISTOGRAM, 
 		CTABLE_EXPAND_SCALAR_WEIGHT, 
-		INVALID 
+		INVALID;
+		public boolean hasSecondInput() {
+			return this == CTABLE_TRANSFORM
+				|| this == CTABLE_EXPAND_SCALAR_WEIGHT
+				|| this == CTABLE_TRANSFORM_SCALAR_WEIGHT;
+		}
+		public boolean hasThirdInput() {
+			return this == CTABLE_TRANSFORM
+				|| this == CTABLE_TRANSFORM_WEIGHTED_HISTOGRAM;
+		}
 	}
 	
 	OperationTypes operation;

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index d1df033..53298c8 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -1966,7 +1966,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 	}
 
 	@Override
-	public void incrementalAggregate(AggregateOperator aggOp, MatrixValue correction, MatrixValue newWithCorrection) {
+	public void incrementalAggregate(AggregateOperator aggOp, MatrixValue correction, MatrixValue newWithCorrection, boolean deep) {
 		throw new DMLRuntimeException("CompressedMatrixBlock: incrementalAggregate not supported.");
 	}
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/runtime/instructions/spark/CtableSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CtableSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CtableSPInstruction.java
index bf2cc91..65e619c 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CtableSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CtableSPInstruction.java
@@ -19,13 +19,14 @@
 
 package org.apache.sysml.runtime.instructions.spark;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.apache.spark.api.java.function.PairFunction;
 
 import scala.Tuple2;
 
@@ -34,7 +35,6 @@ import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.CTable;
 import org.apache.sysml.runtime.instructions.Instruction;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
@@ -43,15 +43,10 @@ import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.CTableMap;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixCell;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
-import org.apache.sysml.runtime.matrix.data.Pair;
-import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
-import org.apache.sysml.runtime.matrix.operators.Operator;
-import org.apache.sysml.runtime.matrix.operators.SimpleOperator;
+import org.apache.sysml.runtime.matrix.mapred.ReblockBuffer;
 import org.apache.sysml.runtime.util.LongLongDoubleHashMap.ADoubleEntry;
-import org.apache.sysml.runtime.util.UtilFunctions;
 
 public class CtableSPInstruction extends ComputationSPInstruction {
 	private String _outDim1;
@@ -106,401 +101,209 @@ public class CtableSPInstruction extends ComputationSPInstruction {
 	public void processInstruction(ExecutionContext ec) {
 		SparkExecutionContext sec = (SparkExecutionContext)ec;
 	
+		Ctable.OperationTypes ctableOp = Ctable.findCtableOperationByInputDataTypes(
+			input1.getDataType(), input2.getDataType(), input3.getDataType());
+		ctableOp = _isExpand ? Ctable.OperationTypes.CTABLE_EXPAND_SCALAR_WEIGHT : ctableOp;
+		
 		//get input rdd handle
 		JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
-		JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = null;
-		JavaPairRDD<MatrixIndexes,MatrixBlock> in3 = null;
-		double scalar_input2 = -1, scalar_input3 = -1;
+		JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = !ctableOp.hasSecondInput() ? null :
+			sec.getBinaryBlockRDDHandleForVariable( input2.getName() );
 		
-		Ctable.OperationTypes ctableOp = Ctable.findCtableOperationByInputDataTypes(
-				input1.getDataType(), input2.getDataType(), input3.getDataType());
-		ctableOp = _isExpand ? Ctable.OperationTypes.CTABLE_EXPAND_SCALAR_WEIGHT : ctableOp;
+		JavaPairRDD<MatrixIndexes,MatrixBlock> in3 = null;
+		double s2 = -1, s3 = -1; //scalars
 		
 		MatrixCharacteristics mc1 = sec.getMatrixCharacteristics(input1.getName());
 		MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName());
 		
-		// First get the block sizes and then set them as -1 to allow for binary cell reblock
-		int brlen = mc1.getRowsPerBlock();
-		int bclen = mc1.getColsPerBlock();
+		// handle known/unknown dimensions
+		long dim1 = (_dim1Literal ? (long) Double.parseDouble(_outDim1) :
+			(sec.getScalarInput(_outDim1, ValueType.DOUBLE, false)).getLongValue());
+		long dim2 = (_dim2Literal ? (long) Double.parseDouble(_outDim2) :
+			(sec.getScalarInput(_outDim2, ValueType.DOUBLE, false)).getLongValue());
+		if( dim1 == -1 && dim2 == -1 ) {
+			//note: if we need to determine the dimensions to we do so before 
+			//creating cells to avoid unnecessary caching, repeated joins, etc.
+			dim1 = (long) RDDAggregateUtils.max(in1);
+			dim2 = ctableOp.hasSecondInput() ? (long) RDDAggregateUtils.max(in2) :
+				sec.getScalarInput(input3).getLongValue();
+		}
+		mcOut.set(dim1, dim2, mc1.getRowsPerBlock(), mc1.getColsPerBlock());
+		mcOut.setNonZerosBound(mc1.getRows());
 		
-		JavaPairRDD<MatrixIndexes, ArrayList<MatrixBlock>> inputMBs = null;
-		JavaPairRDD<MatrixIndexes, CTableMap> ctables = null;
-		JavaPairRDD<MatrixIndexes, Double> bincellsNoFilter = null;
-		boolean setLineage2 = false;
-		boolean setLineage3 = false;
+		//compute preferred degree of parallelism
+		int numParts = Math.max(4 * (mc1.dimsKnown() ?
+			SparkUtils.getNumPreferredPartitions(mc1) : in1.getNumPartitions()),
+			SparkUtils.getNumPreferredPartitions(mcOut));
+		
+		JavaPairRDD<MatrixIndexes, MatrixBlock> out = null;
 		switch(ctableOp) {
 			case CTABLE_TRANSFORM: //(VECTOR)
 				// F=ctable(A,B,W) 
-				in2 = sec.getBinaryBlockRDDHandleForVariable( input2.getName() );
 				in3 = sec.getBinaryBlockRDDHandleForVariable( input3.getName() );
-				setLineage2 = true;
-				setLineage3 = true;
-				
-				inputMBs = in1.cogroup(in2).cogroup(in3)
-							.mapToPair(new MapThreeMBIterableIntoAL());
-				
-				ctables = inputMBs.mapToPair(new PerformCTableMapSideOperation(ctableOp, scalar_input2, 
-							scalar_input3, this.instString, (SimpleOperator)_optr, _ignoreZeros));
+				out = in1.join(in2, numParts).join(in3, numParts)
+					.mapValues(new MapJoinSignature3())
+					.mapPartitionsToPair(new CTableFunction(ctableOp, s2, s3, _ignoreZeros, mcOut));
 				break;
 			
-				
 			case CTABLE_EXPAND_SCALAR_WEIGHT: //(VECTOR)
-				// F = ctable(seq,A) or F = ctable(seq,B,1)
-				scalar_input3 = sec.getScalarInput(input3.getName(), input3.getValueType(), input3.isLiteral()).getDoubleValue();
-				if(scalar_input3 == 1) {
-					in2 = sec.getBinaryBlockRDDHandleForVariable( input2.getName() );
-					setLineage2 = true;
-					bincellsNoFilter = in2.flatMapToPair(new ExpandScalarCtableOperation(brlen));
-					break;
-				}
 			case CTABLE_TRANSFORM_SCALAR_WEIGHT: //(VECTOR/MATRIX)
 				// F = ctable(A,B) or F = ctable(A,B,1)
-				in2 = sec.getBinaryBlockRDDHandleForVariable( input2.getName() );
-				setLineage2 = true;
-
-				scalar_input3 = sec.getScalarInput(input3.getName(), input3.getValueType(), input3.isLiteral()).getDoubleValue();
-				inputMBs = in1.cogroup(in2).mapToPair(new MapTwoMBIterableIntoAL());
-				
-				ctables = inputMBs.mapToPair(new PerformCTableMapSideOperation(ctableOp, scalar_input2, 
-						scalar_input3, this.instString, (SimpleOperator)_optr, _ignoreZeros));
+				s3 = sec.getScalarInput(input3).getDoubleValue();
+				out = in1.join(in2, numParts).mapValues(new MapJoinSignature2())
+					.mapPartitionsToPair(new CTableFunction(ctableOp, s2, s3, _ignoreZeros, mcOut));
 				break;
 				
 			case CTABLE_TRANSFORM_HISTOGRAM: //(VECTOR)
 				// F=ctable(A,1) or F = ctable(A,1,1)
-				scalar_input2 = sec.getScalarInput(input2.getName(), input2.getValueType(), input2.isLiteral()).getDoubleValue();
-				scalar_input3 = sec.getScalarInput(input3.getName(), input3.getValueType(), input3.isLiteral()).getDoubleValue();
-				inputMBs = in1.mapToPair(new MapMBIntoAL());
-				
-				ctables = inputMBs.mapToPair(new PerformCTableMapSideOperation(ctableOp, scalar_input2, 
-						scalar_input3, this.instString, (SimpleOperator)_optr, _ignoreZeros));
+				s2 = sec.getScalarInput(input2).getDoubleValue();
+				s3 = sec.getScalarInput(input3).getDoubleValue();
+				out = in1.mapValues(new MapJoinSignature1())
+					.mapPartitionsToPair(new CTableFunction(ctableOp, s2, s3, _ignoreZeros, mcOut));
 				break;
 				
 			case CTABLE_TRANSFORM_WEIGHTED_HISTOGRAM: //(VECTOR)
 				// F=ctable(A,1,W)
 				in3 = sec.getBinaryBlockRDDHandleForVariable( input3.getName() );
-				setLineage3 = true;
-				
-				scalar_input2 = sec.getScalarInput(input2.getName(), input2.getValueType(), input2.isLiteral()).getDoubleValue();
-				inputMBs = in1.cogroup(in3).mapToPair(new MapTwoMBIterableIntoAL());
-				
-				ctables = inputMBs.mapToPair(new PerformCTableMapSideOperation(ctableOp, scalar_input2, 
-						scalar_input3, this.instString, (SimpleOperator)_optr, _ignoreZeros));
+				s2 = sec.getScalarInput(input2).getDoubleValue();
+				out = in1.join(in3, numParts).mapValues(new MapJoinSignature2())
+					.mapPartitionsToPair(new CTableFunction(ctableOp, s2, s3, _ignoreZeros, mcOut));
 				break;
 			
 			default:
 				throw new DMLRuntimeException("Encountered an invalid ctable operation ("+ctableOp+") while executing instruction: " + this.toString());
 		}
 		
-		// Now perform aggregation on ctables to get binaryCells 
-		if(bincellsNoFilter == null && ctables != null) {
-			bincellsNoFilter =  
-					ctables.values()
-					.flatMapToPair(new ExtractBinaryCellsFromCTable());
-			bincellsNoFilter = RDDAggregateUtils.sumCellsByKeyStable(bincellsNoFilter);
-		}
-		else if(!(bincellsNoFilter != null && ctables == null)) {
-			throw new DMLRuntimeException("Incorrect ctable operation");
-		}
-		
-		// handle known/unknown dimensions
-		long outputDim1 = (_dim1Literal ? (long) Double.parseDouble(_outDim1) : (sec.getScalarInput(_outDim1, ValueType.DOUBLE, false)).getLongValue());
-		long outputDim2 = (_dim2Literal ? (long) Double.parseDouble(_outDim2) : (sec.getScalarInput(_outDim2, ValueType.DOUBLE, false)).getLongValue());
-		MatrixCharacteristics mcBinaryCells = null;
-		boolean findDimensions = (outputDim1 == -1 && outputDim2 == -1); 
-		
-		if( !findDimensions ) {
-			if((outputDim1 == -1 && outputDim2 != -1) || (outputDim1 != -1 && outputDim2 == -1))
-				throw new DMLRuntimeException("Incorrect output dimensions passed to TernarySPInstruction:" + outputDim1 + " " + outputDim2);
-			else 
-				mcBinaryCells = new MatrixCharacteristics(outputDim1, outputDim2, brlen, bclen);	
-			
-			// filtering according to given dimensions
-			bincellsNoFilter = bincellsNoFilter
-					.filter(new FilterCells(mcBinaryCells.getRows(), mcBinaryCells.getCols()));
-		}
-		
-		// convert double values to matrix cell
-		JavaPairRDD<MatrixIndexes, MatrixCell> binaryCells = bincellsNoFilter
-				.mapToPair(new ConvertToBinaryCell());
-		
-		// find dimensions if necessary (w/ cache for reblock)
-		if( findDimensions ) {						
-			binaryCells = SparkUtils.cacheBinaryCellRDD(binaryCells);
-			mcBinaryCells = SparkUtils.computeMatrixCharacteristics(binaryCells);
-		}
+		//perform fused aggregation and reblock
+		out = out.union(SparkUtils.getEmptyBlockRDD(sec.getSparkContext(), mcOut));
+		out = RDDAggregateUtils.sumByKeyStable(out, numParts, false);
 		
 		//store output rdd handle
-		sec.setRDDHandleForVariable(output.getName(), binaryCells);
-		mcOut.set(mcBinaryCells);
-		// Since we are outputing binary cells, we set block sizes = -1
-		mcOut.setRowsPerBlock(-1); mcOut.setColsPerBlock(-1);
+		sec.setRDDHandleForVariable(output.getName(), out);
 		sec.addLineageRDD(output.getName(), input1.getName());
-		if(setLineage2)
+		if( ctableOp.hasSecondInput() )
 			sec.addLineageRDD(output.getName(), input2.getName());
-		if(setLineage3)
+		if( ctableOp.hasThirdInput() )
 			sec.addLineageRDD(output.getName(), input3.getName());
-	}	
-
-	private static class ExpandScalarCtableOperation implements PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>, MatrixIndexes, Double> 
-	{
-		private static final long serialVersionUID = -12552669148928288L;
-	
-		private int _brlen;
-		
-		public ExpandScalarCtableOperation(int brlen) {
-			_brlen = brlen;
-		}
-
-		@Override
-		public Iterator<Tuple2<MatrixIndexes, Double>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0) 
-			throws Exception 
-		{
-			MatrixIndexes ix = arg0._1();
-			MatrixBlock mb = arg0._2(); //col-vector
-			
-			//create an output cell per matrix block row (aligned w/ original source position)
-			ArrayList<Tuple2<MatrixIndexes, Double>> retVal = new ArrayList<>();
-			CTable ctab = CTable.getCTableFnObject();
-			for( int i=0; i<mb.getNumRows(); i++ )
-			{
-				//compute global target indexes (via ctable obj for error handling consistency)
-				long row = UtilFunctions.computeCellIndex(ix.getRowIndex(), _brlen, i);
-				double v2 = mb.quickGetValue(i, 0);
-				Pair<MatrixIndexes,Double> p = ctab.execute(row, v2, 1.0);
-				
-				//indirect construction over pair to avoid tuple2 dependency in general ctable obj
-				if( p.getKey().getRowIndex() >= 1 ) //filter rejected entries
-					retVal.add(new Tuple2<>(p.getKey(), p.getValue()));
-			}
-			
-			return retVal.iterator();
-		}
-	}
-	
-	private static class MapTwoMBIterableIntoAL implements PairFunction<Tuple2<MatrixIndexes,Tuple2<Iterable<MatrixBlock>,Iterable<MatrixBlock>>>, MatrixIndexes, ArrayList<MatrixBlock>> {
-
-		private static final long serialVersionUID = 271459913267735850L;
-
-		private static MatrixBlock extractBlock(Iterable<MatrixBlock> blks, MatrixBlock retVal) throws Exception {
-			for(MatrixBlock blk1 : blks) {
-				if(retVal != null) {
-					throw new Exception("ERROR: More than 1 matrixblock found for one of the inputs at a given index");
-				}
-				retVal = blk1;
-			}
-			if(retVal == null) {
-				throw new Exception("ERROR: No matrixblock found for one of the inputs at a given index");
-			}
-			return retVal;
-		}
-		
-		@Override
-		public Tuple2<MatrixIndexes, ArrayList<MatrixBlock>> call(
-				Tuple2<MatrixIndexes, Tuple2<Iterable<MatrixBlock>, Iterable<MatrixBlock>>> kv)
-				throws Exception {
-			MatrixBlock in1 = null; MatrixBlock in2 = null;
-			in1 = extractBlock(kv._2._1, in1);
-			in2 = extractBlock(kv._2._2, in2);
-			// Now return unflatten AL
-			ArrayList<MatrixBlock> inputs = new ArrayList<>();
-			inputs.add(in1); inputs.add(in2);  
-			return new Tuple2<>(kv._1, inputs);
-		}
-		
 	}
-	
-	private static class MapThreeMBIterableIntoAL implements PairFunction<Tuple2<MatrixIndexes,Tuple2<Iterable<Tuple2<Iterable<MatrixBlock>,Iterable<MatrixBlock>>>,Iterable<MatrixBlock>>>, MatrixIndexes, ArrayList<MatrixBlock>> {
-
-		private static final long serialVersionUID = -4873754507037646974L;
-		
-		private static MatrixBlock extractBlock(Iterable<MatrixBlock> blks, MatrixBlock retVal) throws Exception {
-			for(MatrixBlock blk1 : blks) {
-				if(retVal != null) {
-					throw new Exception("ERROR: More than 1 matrixblock found for one of the inputs at a given index");
-				}
-				retVal = blk1;
-			}
-			if(retVal == null) {
-				throw new Exception("ERROR: No matrixblock found for one of the inputs at a given index");
-			}
-			return retVal;
-		}
-
-		@Override
-		public Tuple2<MatrixIndexes, ArrayList<MatrixBlock>> call(
-				Tuple2<MatrixIndexes, Tuple2<Iterable<Tuple2<Iterable<MatrixBlock>, Iterable<MatrixBlock>>>, Iterable<MatrixBlock>>> kv)
-				throws Exception {
-			MatrixBlock in1 = null; MatrixBlock in2 = null; MatrixBlock in3 = null;
-			
-			for(Tuple2<Iterable<MatrixBlock>, Iterable<MatrixBlock>> blks : kv._2._1) {
-				in1 = extractBlock(blks._1, in1);
-				in2 = extractBlock(blks._2, in2);
-			}
-			in3 = extractBlock(kv._2._2, in3);
-			
-			// Now return unflatten AL
-			ArrayList<MatrixBlock> inputs = new ArrayList<>();
-			inputs.add(in1); inputs.add(in2); inputs.add(in3);  
-			return new Tuple2<>(kv._1, inputs);
-		}
-		
-	}
-	
-	private static class PerformCTableMapSideOperation implements PairFunction<Tuple2<MatrixIndexes,ArrayList<MatrixBlock>>, MatrixIndexes, CTableMap> {
 
+	private static class CTableFunction implements PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes, MatrixBlock[]>>, MatrixIndexes, MatrixBlock> 
+	{
 		private static final long serialVersionUID = 5348127596473232337L;
 
-		Ctable.OperationTypes ctableOp;
-		double scalar_input2; double scalar_input3;
-		String instString;
-		Operator optr;
-		boolean ignoreZeros;
+		private final Ctable.OperationTypes _ctableOp;
+		private final double _scalar_input2, _scalar_input3;
+		private final boolean _ignoreZeros;
+		private final long _dim1, _dim2;
+		private final int _brlen, _bclen;
 		
-		public PerformCTableMapSideOperation(Ctable.OperationTypes ctableOp, double scalar_input2, double scalar_input3, String instString, Operator optr, boolean ignoreZeros) {
-			this.ctableOp = ctableOp;
-			this.scalar_input2 = scalar_input2;
-			this.scalar_input3 = scalar_input3;
-			this.instString = instString;
-			this.optr = optr;
-			this.ignoreZeros = ignoreZeros;
+		public CTableFunction(Ctable.OperationTypes ctableOp, double s2, double s3, boolean ignoreZeros, MatrixCharacteristics mcOut) {
+			this(ctableOp, s2, s3, ignoreZeros, false, mcOut);
 		}
 		
-		private static void expectedALSize(int length, ArrayList<MatrixBlock> al) throws Exception {
-			if(al.size() != length) {
-				throw new Exception("Expected arraylist of size:" + length + ", but found " + al.size());
-			}
+		public CTableFunction(Ctable.OperationTypes ctableOp, double s2, double s3, boolean ignoreZeros, boolean emitEmpty, MatrixCharacteristics mcOut) {
+			_ctableOp = ctableOp;
+			_scalar_input2 = s2;
+			_scalar_input3 = s3;
+			_ignoreZeros = ignoreZeros;
+			_dim1 = mcOut.getRows();
+			_dim2 = mcOut.getCols();
+			_brlen = mcOut.getRowsPerBlock();
+			_bclen = mcOut.getColsPerBlock();
 		}
 		
 		@Override
-		public Tuple2<MatrixIndexes, CTableMap> call(
-				Tuple2<MatrixIndexes, ArrayList<MatrixBlock>> kv) throws Exception {
-			CTableMap ctableResult = new CTableMap();
-			MatrixBlock ctableResultBlock = null;
-			
-			IndexedMatrixValue in1, in2, in3 = null;
-			in1 = new IndexedMatrixValue(kv._1, kv._2.get(0));
-			MatrixBlock matBlock1 = kv._2.get(0);
+		public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Iterator<Tuple2<MatrixIndexes, MatrixBlock[]>> arg0)
+			throws Exception
+		{
+			CTableMap map = new CTableMap(); MatrixBlock block = null;
 			
-			switch( ctableOp )
-			{
-				case CTABLE_TRANSFORM: {
-					in2 = new IndexedMatrixValue(kv._1, kv._2.get(1));
-					in3 = new IndexedMatrixValue(kv._1, kv._2.get(2));
-					expectedALSize(3, kv._2);
-					
-					if(in1==null || in2==null || in3 == null )
-						break;	
-					else
-						OperationsOnMatrixValues.performCtable(in1.getIndexes(), in1.getValue(), in2.getIndexes(),
-							in2.getValue(), in3.getIndexes(), in3.getValue(), ctableResult, ctableResultBlock, optr);
-					break;
-				}
-				case CTABLE_TRANSFORM_SCALAR_WEIGHT: 
-				case CTABLE_EXPAND_SCALAR_WEIGHT:
-				{
-					// 3rd input is a scalar
-					in2 = new IndexedMatrixValue(kv._1, kv._2.get(1));
-					expectedALSize(2, kv._2);
-					if(in1==null || in2==null )
+			//local aggregation of entire partition
+			while( arg0.hasNext() ) {
+				Tuple2<MatrixIndexes,MatrixBlock[]> tmp = arg0.next();
+				MatrixIndexes ix = tmp._1();
+				MatrixBlock[] mb = tmp._2();
+				
+				switch( _ctableOp ) {
+					case CTABLE_TRANSFORM: {
+						OperationsOnMatrixValues.performCtable(ix, mb[0], ix,
+							mb[1], ix, mb[2], map, block, null);
 						break;
-					else
-						matBlock1.ctableOperations((SimpleOperator)optr, kv._2.get(1), scalar_input3, ignoreZeros, ctableResult, ctableResultBlock);
+					}
+					case CTABLE_EXPAND_SCALAR_WEIGHT:
+					case CTABLE_TRANSFORM_SCALAR_WEIGHT: {
+						// 3rd input is a scalar
+						mb[0].ctableOperations(null, mb[1], _scalar_input3, _ignoreZeros, map, block);
 						break;
-				}
-				case CTABLE_TRANSFORM_HISTOGRAM: {
-					expectedALSize(1, kv._2);
-					OperationsOnMatrixValues.performCtable(in1.getIndexes(), in1.getValue(), scalar_input2, 
-							scalar_input3, ctableResult, ctableResultBlock, optr);
-					break;
-				}
-				case CTABLE_TRANSFORM_WEIGHTED_HISTOGRAM: {
-					// 2nd and 3rd inputs are scalars
-					expectedALSize(2, kv._2);
-					in3 = new IndexedMatrixValue(kv._1, kv._2.get(1)); // Note: kv._2.get(1), not kv._2.get(2)
-					
-					if(in1==null || in3==null)
+					}
+					case CTABLE_TRANSFORM_HISTOGRAM: {
+						OperationsOnMatrixValues.performCtable(ix, mb[0],
+							_scalar_input2, _scalar_input3, map, block, null);
 						break;
-					else
-						OperationsOnMatrixValues.performCtable(in1.getIndexes(), in1.getValue(), scalar_input2, 
-								in3.getIndexes(), in3.getValue(), ctableResult, ctableResultBlock, optr);		
-					break;
+					}
+					case CTABLE_TRANSFORM_WEIGHTED_HISTOGRAM: {
+						// 2nd and 3rd inputs are scalars
+						OperationsOnMatrixValues.performCtable(ix, mb[0],
+							_scalar_input2, ix, mb[1], map, block, null);
+						break;
+					}
+					default:
+						break;
+				}
+			}
+			
+			ReblockBuffer rbuff = new ReblockBuffer(Math.min(
+				4*1024*1024, map.size()), _dim1, _dim2, _brlen, _bclen);
+			ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new ArrayList<>();
+			
+			//append to buffer for blocked output
+			Iterator<ADoubleEntry> iter = map.getIterator();
+			while( iter.hasNext() ) {
+				ADoubleEntry e = iter.next();
+				if( e.getKey1() <= _dim1 && e.getKey2() <= _dim2 ) { 
+					if( rbuff.getSize() >= rbuff.getCapacity() )
+						flushBufferToList(rbuff, ret);
+					rbuff.appendCell(e.getKey1(), e.getKey2(), e.value);
 				}
-				default:
-					throw new DMLRuntimeException("Unrecognized opcode in Tertiary Instruction: " + instString);
 			}
-			return new Tuple2<>(kv._1, ctableResult);
+			
+			//final flush buffer
+			if( rbuff.getSize() > 0 )
+				flushBufferToList(rbuff, ret);
+			
+			return ret.iterator();
 		}
-		
-	}
 	
-	private static class MapMBIntoAL implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, ArrayList<MatrixBlock>> {
-
-		private static final long serialVersionUID = 2068398913653350125L;
-
-		@Override
-		public Tuple2<MatrixIndexes, ArrayList<MatrixBlock>> call(
-				Tuple2<MatrixIndexes, MatrixBlock> kv) throws Exception {
-			ArrayList<MatrixBlock> retVal = new ArrayList<>();
-			retVal.add(kv._2);
-			return new Tuple2<>(kv._1, retVal);
+		protected void flushBufferToList( ReblockBuffer rbuff,  ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret ) 
+			throws IOException, DMLRuntimeException
+		{
+			rbuff.flushBufferToBinaryBlocks().stream() // prevent library dependencies
+				.map(b -> SparkUtils.fromIndexedMatrixBlock(b)).forEach(b -> ret.add(b));
 		}
-		
 	}
 	
-	private static class ExtractBinaryCellsFromCTable implements PairFlatMapFunction<CTableMap, MatrixIndexes, Double> {
+	public static class MapJoinSignature1 implements Function<MatrixBlock, MatrixBlock[]> {
+		private static final long serialVersionUID = -8819908424033945028L;
 
-		private static final long serialVersionUID = -5933677686766674444L;
-		
 		@Override
-		public Iterator<Tuple2<MatrixIndexes, Double>> call(CTableMap ctableMap)
-				throws Exception {
-			ArrayList<Tuple2<MatrixIndexes, Double>> retVal = new ArrayList<>();
-			Iterator<ADoubleEntry> iter = ctableMap.getIterator();
-			while( iter.hasNext() ) {
-				ADoubleEntry ijv = iter.next();
-				long i = ijv.getKey1();
-				long j =  ijv.getKey2();
-				double v =  ijv.value;
-				retVal.add(new Tuple2<>(new MatrixIndexes(i, j), v));
-			}
-			return retVal.iterator();
+		public MatrixBlock[] call(MatrixBlock v1) throws Exception {
+			return ArrayUtils.toArray(v1);
 		}
-		
 	}
 	
-	private static class ConvertToBinaryCell implements PairFunction<Tuple2<MatrixIndexes,Double>, MatrixIndexes, MatrixCell> {
-
-		private static final long serialVersionUID = 7481186480851982800L;
-		
+	public static class MapJoinSignature2 implements Function<Tuple2<MatrixBlock,MatrixBlock>, MatrixBlock[]> {
+		private static final long serialVersionUID = 7690448020081435520L;
 		@Override
-		public Tuple2<MatrixIndexes, MatrixCell> call(
-				Tuple2<MatrixIndexes, Double> kv) throws Exception {
-			
-			MatrixCell cell = new MatrixCell(kv._2().doubleValue());
-			return new Tuple2<>(kv._1(), cell);
+		public MatrixBlock[] call(Tuple2<MatrixBlock, MatrixBlock> v1) throws Exception {
+			return ArrayUtils.toArray(v1._1(), v1._2());
 		}
-		
 	}
 	
-	private static class FilterCells implements Function<Tuple2<MatrixIndexes,Double>, Boolean> {
-		private static final long serialVersionUID = 108448577697623247L;
-
-		long rlen; long clen;
-		public FilterCells(long rlen, long clen) {
-			this.rlen = rlen;
-			this.clen = clen;
-		}
-		
+	public static class MapJoinSignature3 implements Function<Tuple2<Tuple2<MatrixBlock,MatrixBlock>,MatrixBlock>, MatrixBlock[]> {
+		private static final long serialVersionUID = -5222678882354280164L;
 		@Override
-		public Boolean call(Tuple2<MatrixIndexes, Double> kv) throws Exception {
-			if(kv._1.getRowIndex() <= 0 || kv._1.getColumnIndex() <= 0) {
-				throw new Exception("Incorrect cell values in TernarySPInstruction:" + kv._1);
-			}
-			if(kv._1.getRowIndex() <= rlen && kv._1.getColumnIndex() <= clen) {
-				return true;
-			}
-			return false;
+		public MatrixBlock[] call(Tuple2<Tuple2<MatrixBlock, MatrixBlock>, MatrixBlock> v1) throws Exception {
+			return ArrayUtils.toArray(v1._1()._1(), v1._1()._2(), v1._2());
 		}
-		
 	}
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index 6e647ee..29cd567 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -65,7 +65,6 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixCell;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
-import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
 import org.apache.sysml.runtime.matrix.mapred.ReblockBuffer;
 import org.apache.sysml.runtime.util.DataConverter;
 import org.apache.sysml.runtime.util.FastStringTokenizer;
@@ -491,10 +490,8 @@ public class RDDConverterUtils
 		protected void flushBufferToList( ReblockBuffer rbuff,  ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret ) 
 			throws IOException, DMLRuntimeException
 		{
-			//temporary list of indexed matrix values to prevent library dependencies
-			ArrayList<IndexedMatrixValue> rettmp = new ArrayList<>();
-			rbuff.flushBufferToBinaryBlocks(rettmp);
-			ret.addAll(SparkUtils.fromIndexedMatrixBlock(rettmp));
+			rbuff.flushBufferToBinaryBlocks().stream() // prevent library dependencies
+				.map(b -> SparkUtils.fromIndexedMatrixBlock(b)).forEach(b -> ret.add(b));
 		}
 	}
 
@@ -574,11 +571,11 @@ public class RDDConverterUtils
 	/////////////////////////////////
 	// BINARYCELL-SPECIFIC FUNCTIONS
 
-	private static class BinaryCellToBinaryBlockFunction extends CellToBinaryBlockFunction implements PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes,MatrixCell>>,MatrixIndexes,MatrixBlock> 
+	public static class BinaryCellToBinaryBlockFunction extends CellToBinaryBlockFunction implements PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes,MatrixCell>>,MatrixIndexes,MatrixBlock> 
 	{
 		private static final long serialVersionUID = 3928810989462198243L;
 
-		protected BinaryCellToBinaryBlockFunction(MatrixCharacteristics mc) {
+		public BinaryCellToBinaryBlockFunction(MatrixCharacteristics mc) {
 			super(mc);
 		}
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
index 77800e4..4871aee 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
@@ -389,10 +389,8 @@ public class RDDConverterUtilsExt
 		private static void flushBufferToList( ReblockBuffer rbuff,  ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret )
 			throws IOException, DMLRuntimeException
 		{
-			//temporary list of indexed matrix values to prevent library dependencies
-			ArrayList<IndexedMatrixValue> rettmp = new ArrayList<IndexedMatrixValue>();
-			rbuff.flushBufferToBinaryBlocks(rettmp);
-			ret.addAll(SparkUtils.fromIndexedMatrixBlock(rettmp));
+			rbuff.flushBufferToBinaryBlocks().stream() // prevent library dependencies
+				.map(b -> SparkUtils.fromIndexedMatrixBlock(b)).forEach(b -> ret.add(b));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixPackedCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixPackedCell.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixPackedCell.java
index ecf44b6..e8d0316 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixPackedCell.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixPackedCell.java
@@ -73,7 +73,7 @@ public class MatrixPackedCell extends MatrixCell
 	//with corrections
 	@Override
 	public void incrementalAggregate(AggregateOperator aggOp, MatrixValue correction, 
-			MatrixValue newWithCorrection) {
+			MatrixValue newWithCorrection, boolean deep) {
 		incrementalAggregate(aggOp, newWithCorrection);
 	}
 	

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixValue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixValue.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixValue.java
index 82b09e0..88a918d 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixValue.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixValue.java
@@ -127,10 +127,6 @@ public abstract class MatrixValue implements WritableComparable
 	
 	public abstract MatrixValue unaryOperations(UnaryOperator op, MatrixValue result);
 	
-	public void incrementalAggregate(AggregateOperator aggOp, MatrixValue correction, MatrixValue newWithCorrection) {
-		incrementalAggregate(aggOp, correction, newWithCorrection, true);
-	}
-	
 	public abstract void incrementalAggregate(AggregateOperator aggOp, MatrixValue correction, MatrixValue newWithCorrection, boolean deep);
 	
 	public abstract void incrementalAggregate(AggregateOperator aggOp, MatrixValue newWithCorrection);

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java
index 7f273fb..8d6f2e6 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java
@@ -23,8 +23,10 @@ package org.apache.sysml.runtime.matrix.mapred;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -223,11 +225,11 @@ public class ReblockBuffer
 		_count = 0;
 	}
 
-	public void flushBufferToBinaryBlocks( ArrayList<IndexedMatrixValue> outList ) 
+	public List<IndexedMatrixValue> flushBufferToBinaryBlocks() 
 		throws IOException, DMLRuntimeException
 	{
 		if( _count == 0 )
-			return;
+			return Collections.emptyList();
 		
 		//Step 1) sort reblock buffer (blockwise, no in-block sorting!)
 		Arrays.sort( _buff, 0 ,_count, new ReblockBufferComparator() );
@@ -248,7 +250,8 @@ public class ReblockBuffer
 			}
 		}
 		
-		//Step 3) output blocks 
+		//Step 3) output blocks
+		ArrayList<IndexedMatrixValue> ret = new ArrayList<>();
 		boolean sparse = MatrixBlock.evalSparseFormatInMemory(_brlen, _bclen, _count/numBlocks);
 		MatrixIndexes tmpIx = new MatrixIndexes();
 		MatrixBlock tmpBlock = new MatrixBlock();
@@ -262,7 +265,7 @@ public class ReblockBuffer
 			
 			//output block and switch to next index pair
 			if( bi != cbi || bj != cbj ) {
-				outputBlock(outList, tmpIx, tmpBlock);
+				outputBlock(ret, tmpIx, tmpBlock);
 				cbi = bi;
 				cbj = bj;
 				tmpIx = new MatrixIndexes(bi, bj);
@@ -278,9 +281,9 @@ public class ReblockBuffer
 		}
 		
 		//output last block 
-		outputBlock(outList, tmpIx, tmpBlock);
-		
+		outputBlock(ret, tmpIx, tmpBlock);
 		_count = 0;
+		return ret;
 	}
 
 	private static void outputBlock( OutputCollector<Writable, Writable> out, MatrixIndexes key, TaggedAdaptivePartialBlock value, MatrixBlock block ) 

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index f0911bb..1947c00 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -22,8 +22,11 @@ package org.apache.sysml.runtime.util;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.sysml.parser.Expression.ValueType;
@@ -663,4 +666,9 @@ public class UtilFunctions
 			ret.add(element);
 		return ret;
 	}
+	
+	public static <T> Stream<T> getStream(Iterator<T> iter) {
+		Iterable<T> iterable = () -> iter;
+		return StreamSupport.stream(iterable.spliterator(), false);
+	}
 }