You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/04/27 07:02:39 UTC

[5/6] systemml git commit: [SYSTEMML-2279] Performance spark ctable (1-pass, fused reblock)

[SYSTEMML-2279] Performance spark ctable (1-pass, fused reblock)

This patch significantly improves the performance of the spark ctable
instruction. So far, we constructed double outputs, converted them to
cell, aggregated the cells, determined dimensions, and finally used a
reblock to bring the output into binary block format. For large and
ultra-sparse matrices this caused a redundant pass over the expensive
cell conversion and lots of unnecessary shuffling.

Instead, we now scan the inputs to determine the output dimensions if
necessary, locally aggregate all cells of a partition and directly
output non-zero blocks which feed together with injected empty blocks
(via union) into a fused reblock with global aggregation. In addition,
this also includes numerous smaller improvements to utilize the existing
cluster parallelism and improve memory efficiency and thus reduce the
garbage collection overhead.

On a scenario of sum(table(seq(1,1e9),1+seq(1,1e9)/1000)) with disabled
rewrites and together with the changes from SYSTEMML-2279 through 2282,
this patch improved end-to-end the runtime from 3163s to 455s.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/b3fef523
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/b3fef523
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/b3fef523

Branch: refs/heads/master
Commit: b3fef523c1057c0c82935954210686317492607c
Parents: 7cb43dd
Author: Matthias Boehm <mb...@gmail.com>
Authored: Thu Apr 26 21:27:11 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Apr 27 00:03:20 2018 -0700

----------------------------------------------------------------------
 .../org/apache/sysml/hops/OptimizerUtils.java   |   4 +-
 .../java/org/apache/sysml/hops/TernaryOp.java   |  11 +-
 src/main/java/org/apache/sysml/lops/Ctable.java |  11 +-
 .../runtime/compress/CompressedMatrixBlock.java |   2 +-
 .../instructions/spark/CtableSPInstruction.java | 473 ++++++-------------
 .../spark/utils/RDDConverterUtils.java          |  11 +-
 .../spark/utils/RDDConverterUtilsExt.java       |   6 +-
 .../runtime/matrix/data/MatrixPackedCell.java   |   2 +-
 .../sysml/runtime/matrix/data/MatrixValue.java  |   4 -
 .../runtime/matrix/mapred/ReblockBuffer.java    |  15 +-
 .../sysml/runtime/util/UtilFunctions.java       |   8 +
 11 files changed, 177 insertions(+), 370 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
index e9af001..72f9b81 100644
--- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
@@ -686,7 +686,7 @@ public class OptimizerUtils
 				mc.getCols(), 
 				mc.getRowsPerBlock(), 
 				mc.getColsPerBlock(), 
-				mc.getNonZeros());
+				mc.getNonZerosBound());
 	}
 	
 	/**
@@ -725,7 +725,7 @@ public class OptimizerUtils
 		long tnrblks = (long)Math.ceil((double)rlen/brlen);
 		long tncblks = (long)Math.ceil((double)clen/bclen);
 		long nnz = (long) Math.ceil(sp * rlen * clen);
-		if( nnz < tnrblks * tncblks ) {
+		if( nnz <= tnrblks * tncblks ) {
 			long lrlen = Math.min(rlen, brlen);
 			long lclen = Math.min(clen, bclen);
 			return nnz * estimateSizeExactSparsity(lrlen, lclen, 1)

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/hops/TernaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/TernaryOp.java b/src/main/java/org/apache/sysml/hops/TernaryOp.java
index c7c7832..b6e62ff 100644
--- a/src/main/java/org/apache/sysml/hops/TernaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/TernaryOp.java
@@ -428,18 +428,11 @@ public class TernaryOp extends Hop
 			ternary.getOutputParameters().setDimensions(_dim1, _dim2, getRowsInBlock(), getColsInBlock(), -1);
 			setLineNumbers(ternary);
 			
-			//force blocked output in CP (see below), otherwise binarycell
-			if ( et == ExecType.SPARK ) {
-				ternary.getOutputParameters().setDimensions(_dim1, _dim2, -1, -1, -1);
-				setRequiresReblock( true );
-			}
-			else
-				ternary.getOutputParameters().setDimensions(_dim1, _dim2, getRowsInBlock(), getColsInBlock(), -1);
+			//force blocked output in CP and SPARK
+			ternary.getOutputParameters().setDimensions(_dim1, _dim2, getRowsInBlock(), getColsInBlock(), -1);
 			
 			//ternary opt, w/o reblock in CP
 			setLops(ternary);
-			
-			
 		}
 		else //MR
 		{

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/lops/Ctable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/Ctable.java b/src/main/java/org/apache/sysml/lops/Ctable.java
index ff9c720..127754e 100644
--- a/src/main/java/org/apache/sysml/lops/Ctable.java
+++ b/src/main/java/org/apache/sysml/lops/Ctable.java
@@ -42,7 +42,16 @@ public class Ctable extends Lop
 		CTABLE_TRANSFORM_HISTOGRAM, 
 		CTABLE_TRANSFORM_WEIGHTED_HISTOGRAM, 
 		CTABLE_EXPAND_SCALAR_WEIGHT, 
-		INVALID 
+		INVALID;
+		public boolean hasSecondInput() {
+			return this == CTABLE_TRANSFORM
+				|| this == CTABLE_EXPAND_SCALAR_WEIGHT
+				|| this == CTABLE_TRANSFORM_SCALAR_WEIGHT;
+		}
+		public boolean hasThirdInput() {
+			return this == CTABLE_TRANSFORM
+				|| this == CTABLE_TRANSFORM_WEIGHTED_HISTOGRAM;
+		}
 	}
 	
 	OperationTypes operation;

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index d1df033..53298c8 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -1966,7 +1966,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 	}
 
 	@Override
-	public void incrementalAggregate(AggregateOperator aggOp, MatrixValue correction, MatrixValue newWithCorrection) {
+	public void incrementalAggregate(AggregateOperator aggOp, MatrixValue correction, MatrixValue newWithCorrection, boolean deep) {
 		throw new DMLRuntimeException("CompressedMatrixBlock: incrementalAggregate not supported.");
 	}
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/runtime/instructions/spark/CtableSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CtableSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CtableSPInstruction.java
index bf2cc91..65e619c 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CtableSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CtableSPInstruction.java
@@ -19,13 +19,14 @@
 
 package org.apache.sysml.runtime.instructions.spark;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.apache.spark.api.java.function.PairFunction;
 
 import scala.Tuple2;
 
@@ -34,7 +35,6 @@ import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.CTable;
 import org.apache.sysml.runtime.instructions.Instruction;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
@@ -43,15 +43,10 @@ import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.CTableMap;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixCell;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
-import org.apache.sysml.runtime.matrix.data.Pair;
-import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
-import org.apache.sysml.runtime.matrix.operators.Operator;
-import org.apache.sysml.runtime.matrix.operators.SimpleOperator;
+import org.apache.sysml.runtime.matrix.mapred.ReblockBuffer;
 import org.apache.sysml.runtime.util.LongLongDoubleHashMap.ADoubleEntry;
-import org.apache.sysml.runtime.util.UtilFunctions;
 
 public class CtableSPInstruction extends ComputationSPInstruction {
 	private String _outDim1;
@@ -106,401 +101,209 @@ public class CtableSPInstruction extends ComputationSPInstruction {
 	public void processInstruction(ExecutionContext ec) {
 		SparkExecutionContext sec = (SparkExecutionContext)ec;
 	
+		Ctable.OperationTypes ctableOp = Ctable.findCtableOperationByInputDataTypes(
+			input1.getDataType(), input2.getDataType(), input3.getDataType());
+		ctableOp = _isExpand ? Ctable.OperationTypes.CTABLE_EXPAND_SCALAR_WEIGHT : ctableOp;
+		
 		//get input rdd handle
 		JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
-		JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = null;
-		JavaPairRDD<MatrixIndexes,MatrixBlock> in3 = null;
-		double scalar_input2 = -1, scalar_input3 = -1;
+		JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = !ctableOp.hasSecondInput() ? null :
+			sec.getBinaryBlockRDDHandleForVariable( input2.getName() );
 		
-		Ctable.OperationTypes ctableOp = Ctable.findCtableOperationByInputDataTypes(
-				input1.getDataType(), input2.getDataType(), input3.getDataType());
-		ctableOp = _isExpand ? Ctable.OperationTypes.CTABLE_EXPAND_SCALAR_WEIGHT : ctableOp;
+		JavaPairRDD<MatrixIndexes,MatrixBlock> in3 = null;
+		double s2 = -1, s3 = -1; //scalars
 		
 		MatrixCharacteristics mc1 = sec.getMatrixCharacteristics(input1.getName());
 		MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName());
 		
-		// First get the block sizes and then set them as -1 to allow for binary cell reblock
-		int brlen = mc1.getRowsPerBlock();
-		int bclen = mc1.getColsPerBlock();
+		// handle known/unknown dimensions
+		long dim1 = (_dim1Literal ? (long) Double.parseDouble(_outDim1) :
+			(sec.getScalarInput(_outDim1, ValueType.DOUBLE, false)).getLongValue());
+		long dim2 = (_dim2Literal ? (long) Double.parseDouble(_outDim2) :
+			(sec.getScalarInput(_outDim2, ValueType.DOUBLE, false)).getLongValue());
+		if( dim1 == -1 && dim2 == -1 ) {
+			//note: if we need to determine the dimensions to we do so before 
+			//creating cells to avoid unnecessary caching, repeated joins, etc.
+			dim1 = (long) RDDAggregateUtils.max(in1);
+			dim2 = ctableOp.hasSecondInput() ? (long) RDDAggregateUtils.max(in2) :
+				sec.getScalarInput(input3).getLongValue();
+		}
+		mcOut.set(dim1, dim2, mc1.getRowsPerBlock(), mc1.getColsPerBlock());
+		mcOut.setNonZerosBound(mc1.getRows());
 		
-		JavaPairRDD<MatrixIndexes, ArrayList<MatrixBlock>> inputMBs = null;
-		JavaPairRDD<MatrixIndexes, CTableMap> ctables = null;
-		JavaPairRDD<MatrixIndexes, Double> bincellsNoFilter = null;
-		boolean setLineage2 = false;
-		boolean setLineage3 = false;
+		//compute preferred degree of parallelism
+		int numParts = Math.max(4 * (mc1.dimsKnown() ?
+			SparkUtils.getNumPreferredPartitions(mc1) : in1.getNumPartitions()),
+			SparkUtils.getNumPreferredPartitions(mcOut));
+		
+		JavaPairRDD<MatrixIndexes, MatrixBlock> out = null;
 		switch(ctableOp) {
 			case CTABLE_TRANSFORM: //(VECTOR)
 				// F=ctable(A,B,W) 
-				in2 = sec.getBinaryBlockRDDHandleForVariable( input2.getName() );
 				in3 = sec.getBinaryBlockRDDHandleForVariable( input3.getName() );
-				setLineage2 = true;
-				setLineage3 = true;
-				
-				inputMBs = in1.cogroup(in2).cogroup(in3)
-							.mapToPair(new MapThreeMBIterableIntoAL());
-				
-				ctables = inputMBs.mapToPair(new PerformCTableMapSideOperation(ctableOp, scalar_input2, 
-							scalar_input3, this.instString, (SimpleOperator)_optr, _ignoreZeros));
+				out = in1.join(in2, numParts).join(in3, numParts)
+					.mapValues(new MapJoinSignature3())
+					.mapPartitionsToPair(new CTableFunction(ctableOp, s2, s3, _ignoreZeros, mcOut));
 				break;
 			
-				
 			case CTABLE_EXPAND_SCALAR_WEIGHT: //(VECTOR)
-				// F = ctable(seq,A) or F = ctable(seq,B,1)
-				scalar_input3 = sec.getScalarInput(input3.getName(), input3.getValueType(), input3.isLiteral()).getDoubleValue();
-				if(scalar_input3 == 1) {
-					in2 = sec.getBinaryBlockRDDHandleForVariable( input2.getName() );
-					setLineage2 = true;
-					bincellsNoFilter = in2.flatMapToPair(new ExpandScalarCtableOperation(brlen));
-					break;
-				}
 			case CTABLE_TRANSFORM_SCALAR_WEIGHT: //(VECTOR/MATRIX)
 				// F = ctable(A,B) or F = ctable(A,B,1)
-				in2 = sec.getBinaryBlockRDDHandleForVariable( input2.getName() );
-				setLineage2 = true;
-
-				scalar_input3 = sec.getScalarInput(input3.getName(), input3.getValueType(), input3.isLiteral()).getDoubleValue();
-				inputMBs = in1.cogroup(in2).mapToPair(new MapTwoMBIterableIntoAL());
-				
-				ctables = inputMBs.mapToPair(new PerformCTableMapSideOperation(ctableOp, scalar_input2, 
-						scalar_input3, this.instString, (SimpleOperator)_optr, _ignoreZeros));
+				s3 = sec.getScalarInput(input3).getDoubleValue();
+				out = in1.join(in2, numParts).mapValues(new MapJoinSignature2())
+					.mapPartitionsToPair(new CTableFunction(ctableOp, s2, s3, _ignoreZeros, mcOut));
 				break;
 				
 			case CTABLE_TRANSFORM_HISTOGRAM: //(VECTOR)
 				// F=ctable(A,1) or F = ctable(A,1,1)
-				scalar_input2 = sec.getScalarInput(input2.getName(), input2.getValueType(), input2.isLiteral()).getDoubleValue();
-				scalar_input3 = sec.getScalarInput(input3.getName(), input3.getValueType(), input3.isLiteral()).getDoubleValue();
-				inputMBs = in1.mapToPair(new MapMBIntoAL());
-				
-				ctables = inputMBs.mapToPair(new PerformCTableMapSideOperation(ctableOp, scalar_input2, 
-						scalar_input3, this.instString, (SimpleOperator)_optr, _ignoreZeros));
+				s2 = sec.getScalarInput(input2).getDoubleValue();
+				s3 = sec.getScalarInput(input3).getDoubleValue();
+				out = in1.mapValues(new MapJoinSignature1())
+					.mapPartitionsToPair(new CTableFunction(ctableOp, s2, s3, _ignoreZeros, mcOut));
 				break;
 				
 			case CTABLE_TRANSFORM_WEIGHTED_HISTOGRAM: //(VECTOR)
 				// F=ctable(A,1,W)
 				in3 = sec.getBinaryBlockRDDHandleForVariable( input3.getName() );
-				setLineage3 = true;
-				
-				scalar_input2 = sec.getScalarInput(input2.getName(), input2.getValueType(), input2.isLiteral()).getDoubleValue();
-				inputMBs = in1.cogroup(in3).mapToPair(new MapTwoMBIterableIntoAL());
-				
-				ctables = inputMBs.mapToPair(new PerformCTableMapSideOperation(ctableOp, scalar_input2, 
-						scalar_input3, this.instString, (SimpleOperator)_optr, _ignoreZeros));
+				s2 = sec.getScalarInput(input2).getDoubleValue();
+				out = in1.join(in3, numParts).mapValues(new MapJoinSignature2())
+					.mapPartitionsToPair(new CTableFunction(ctableOp, s2, s3, _ignoreZeros, mcOut));
 				break;
 			
 			default:
 				throw new DMLRuntimeException("Encountered an invalid ctable operation ("+ctableOp+") while executing instruction: " + this.toString());
 		}
 		
-		// Now perform aggregation on ctables to get binaryCells 
-		if(bincellsNoFilter == null && ctables != null) {
-			bincellsNoFilter =  
-					ctables.values()
-					.flatMapToPair(new ExtractBinaryCellsFromCTable());
-			bincellsNoFilter = RDDAggregateUtils.sumCellsByKeyStable(bincellsNoFilter);
-		}
-		else if(!(bincellsNoFilter != null && ctables == null)) {
-			throw new DMLRuntimeException("Incorrect ctable operation");
-		}
-		
-		// handle known/unknown dimensions
-		long outputDim1 = (_dim1Literal ? (long) Double.parseDouble(_outDim1) : (sec.getScalarInput(_outDim1, ValueType.DOUBLE, false)).getLongValue());
-		long outputDim2 = (_dim2Literal ? (long) Double.parseDouble(_outDim2) : (sec.getScalarInput(_outDim2, ValueType.DOUBLE, false)).getLongValue());
-		MatrixCharacteristics mcBinaryCells = null;
-		boolean findDimensions = (outputDim1 == -1 && outputDim2 == -1); 
-		
-		if( !findDimensions ) {
-			if((outputDim1 == -1 && outputDim2 != -1) || (outputDim1 != -1 && outputDim2 == -1))
-				throw new DMLRuntimeException("Incorrect output dimensions passed to TernarySPInstruction:" + outputDim1 + " " + outputDim2);
-			else 
-				mcBinaryCells = new MatrixCharacteristics(outputDim1, outputDim2, brlen, bclen);	
-			
-			// filtering according to given dimensions
-			bincellsNoFilter = bincellsNoFilter
-					.filter(new FilterCells(mcBinaryCells.getRows(), mcBinaryCells.getCols()));
-		}
-		
-		// convert double values to matrix cell
-		JavaPairRDD<MatrixIndexes, MatrixCell> binaryCells = bincellsNoFilter
-				.mapToPair(new ConvertToBinaryCell());
-		
-		// find dimensions if necessary (w/ cache for reblock)
-		if( findDimensions ) {						
-			binaryCells = SparkUtils.cacheBinaryCellRDD(binaryCells);
-			mcBinaryCells = SparkUtils.computeMatrixCharacteristics(binaryCells);
-		}
+		//perform fused aggregation and reblock
+		out = out.union(SparkUtils.getEmptyBlockRDD(sec.getSparkContext(), mcOut));
+		out = RDDAggregateUtils.sumByKeyStable(out, numParts, false);
 		
 		//store output rdd handle
-		sec.setRDDHandleForVariable(output.getName(), binaryCells);
-		mcOut.set(mcBinaryCells);
-		// Since we are outputing binary cells, we set block sizes = -1
-		mcOut.setRowsPerBlock(-1); mcOut.setColsPerBlock(-1);
+		sec.setRDDHandleForVariable(output.getName(), out);
 		sec.addLineageRDD(output.getName(), input1.getName());
-		if(setLineage2)
+		if( ctableOp.hasSecondInput() )
 			sec.addLineageRDD(output.getName(), input2.getName());
-		if(setLineage3)
+		if( ctableOp.hasThirdInput() )
 			sec.addLineageRDD(output.getName(), input3.getName());
-	}	
-
-	private static class ExpandScalarCtableOperation implements PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>, MatrixIndexes, Double> 
-	{
-		private static final long serialVersionUID = -12552669148928288L;
-	
-		private int _brlen;
-		
-		public ExpandScalarCtableOperation(int brlen) {
-			_brlen = brlen;
-		}
-
-		@Override
-		public Iterator<Tuple2<MatrixIndexes, Double>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0) 
-			throws Exception 
-		{
-			MatrixIndexes ix = arg0._1();
-			MatrixBlock mb = arg0._2(); //col-vector
-			
-			//create an output cell per matrix block row (aligned w/ original source position)
-			ArrayList<Tuple2<MatrixIndexes, Double>> retVal = new ArrayList<>();
-			CTable ctab = CTable.getCTableFnObject();
-			for( int i=0; i<mb.getNumRows(); i++ )
-			{
-				//compute global target indexes (via ctable obj for error handling consistency)
-				long row = UtilFunctions.computeCellIndex(ix.getRowIndex(), _brlen, i);
-				double v2 = mb.quickGetValue(i, 0);
-				Pair<MatrixIndexes,Double> p = ctab.execute(row, v2, 1.0);
-				
-				//indirect construction over pair to avoid tuple2 dependency in general ctable obj
-				if( p.getKey().getRowIndex() >= 1 ) //filter rejected entries
-					retVal.add(new Tuple2<>(p.getKey(), p.getValue()));
-			}
-			
-			return retVal.iterator();
-		}
-	}
-	
-	private static class MapTwoMBIterableIntoAL implements PairFunction<Tuple2<MatrixIndexes,Tuple2<Iterable<MatrixBlock>,Iterable<MatrixBlock>>>, MatrixIndexes, ArrayList<MatrixBlock>> {
-
-		private static final long serialVersionUID = 271459913267735850L;
-
-		private static MatrixBlock extractBlock(Iterable<MatrixBlock> blks, MatrixBlock retVal) throws Exception {
-			for(MatrixBlock blk1 : blks) {
-				if(retVal != null) {
-					throw new Exception("ERROR: More than 1 matrixblock found for one of the inputs at a given index");
-				}
-				retVal = blk1;
-			}
-			if(retVal == null) {
-				throw new Exception("ERROR: No matrixblock found for one of the inputs at a given index");
-			}
-			return retVal;
-		}
-		
-		@Override
-		public Tuple2<MatrixIndexes, ArrayList<MatrixBlock>> call(
-				Tuple2<MatrixIndexes, Tuple2<Iterable<MatrixBlock>, Iterable<MatrixBlock>>> kv)
-				throws Exception {
-			MatrixBlock in1 = null; MatrixBlock in2 = null;
-			in1 = extractBlock(kv._2._1, in1);
-			in2 = extractBlock(kv._2._2, in2);
-			// Now return unflatten AL
-			ArrayList<MatrixBlock> inputs = new ArrayList<>();
-			inputs.add(in1); inputs.add(in2);  
-			return new Tuple2<>(kv._1, inputs);
-		}
-		
 	}
-	
-	private static class MapThreeMBIterableIntoAL implements PairFunction<Tuple2<MatrixIndexes,Tuple2<Iterable<Tuple2<Iterable<MatrixBlock>,Iterable<MatrixBlock>>>,Iterable<MatrixBlock>>>, MatrixIndexes, ArrayList<MatrixBlock>> {
-
-		private static final long serialVersionUID = -4873754507037646974L;
-		
-		private static MatrixBlock extractBlock(Iterable<MatrixBlock> blks, MatrixBlock retVal) throws Exception {
-			for(MatrixBlock blk1 : blks) {
-				if(retVal != null) {
-					throw new Exception("ERROR: More than 1 matrixblock found for one of the inputs at a given index");
-				}
-				retVal = blk1;
-			}
-			if(retVal == null) {
-				throw new Exception("ERROR: No matrixblock found for one of the inputs at a given index");
-			}
-			return retVal;
-		}
-
-		@Override
-		public Tuple2<MatrixIndexes, ArrayList<MatrixBlock>> call(
-				Tuple2<MatrixIndexes, Tuple2<Iterable<Tuple2<Iterable<MatrixBlock>, Iterable<MatrixBlock>>>, Iterable<MatrixBlock>>> kv)
-				throws Exception {
-			MatrixBlock in1 = null; MatrixBlock in2 = null; MatrixBlock in3 = null;
-			
-			for(Tuple2<Iterable<MatrixBlock>, Iterable<MatrixBlock>> blks : kv._2._1) {
-				in1 = extractBlock(blks._1, in1);
-				in2 = extractBlock(blks._2, in2);
-			}
-			in3 = extractBlock(kv._2._2, in3);
-			
-			// Now return unflatten AL
-			ArrayList<MatrixBlock> inputs = new ArrayList<>();
-			inputs.add(in1); inputs.add(in2); inputs.add(in3);  
-			return new Tuple2<>(kv._1, inputs);
-		}
-		
-	}
-	
-	private static class PerformCTableMapSideOperation implements PairFunction<Tuple2<MatrixIndexes,ArrayList<MatrixBlock>>, MatrixIndexes, CTableMap> {
 
+	private static class CTableFunction implements PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes, MatrixBlock[]>>, MatrixIndexes, MatrixBlock> 
+	{
 		private static final long serialVersionUID = 5348127596473232337L;
 
-		Ctable.OperationTypes ctableOp;
-		double scalar_input2; double scalar_input3;
-		String instString;
-		Operator optr;
-		boolean ignoreZeros;
+		private final Ctable.OperationTypes _ctableOp;
+		private final double _scalar_input2, _scalar_input3;
+		private final boolean _ignoreZeros;
+		private final long _dim1, _dim2;
+		private final int _brlen, _bclen;
 		
-		public PerformCTableMapSideOperation(Ctable.OperationTypes ctableOp, double scalar_input2, double scalar_input3, String instString, Operator optr, boolean ignoreZeros) {
-			this.ctableOp = ctableOp;
-			this.scalar_input2 = scalar_input2;
-			this.scalar_input3 = scalar_input3;
-			this.instString = instString;
-			this.optr = optr;
-			this.ignoreZeros = ignoreZeros;
+		public CTableFunction(Ctable.OperationTypes ctableOp, double s2, double s3, boolean ignoreZeros, MatrixCharacteristics mcOut) {
+			this(ctableOp, s2, s3, ignoreZeros, false, mcOut);
 		}
 		
-		private static void expectedALSize(int length, ArrayList<MatrixBlock> al) throws Exception {
-			if(al.size() != length) {
-				throw new Exception("Expected arraylist of size:" + length + ", but found " + al.size());
-			}
+		public CTableFunction(Ctable.OperationTypes ctableOp, double s2, double s3, boolean ignoreZeros, boolean emitEmpty, MatrixCharacteristics mcOut) {
+			_ctableOp = ctableOp;
+			_scalar_input2 = s2;
+			_scalar_input3 = s3;
+			_ignoreZeros = ignoreZeros;
+			_dim1 = mcOut.getRows();
+			_dim2 = mcOut.getCols();
+			_brlen = mcOut.getRowsPerBlock();
+			_bclen = mcOut.getColsPerBlock();
 		}
 		
 		@Override
-		public Tuple2<MatrixIndexes, CTableMap> call(
-				Tuple2<MatrixIndexes, ArrayList<MatrixBlock>> kv) throws Exception {
-			CTableMap ctableResult = new CTableMap();
-			MatrixBlock ctableResultBlock = null;
-			
-			IndexedMatrixValue in1, in2, in3 = null;
-			in1 = new IndexedMatrixValue(kv._1, kv._2.get(0));
-			MatrixBlock matBlock1 = kv._2.get(0);
+		public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Iterator<Tuple2<MatrixIndexes, MatrixBlock[]>> arg0)
+			throws Exception
+		{
+			CTableMap map = new CTableMap(); MatrixBlock block = null;
 			
-			switch( ctableOp )
-			{
-				case CTABLE_TRANSFORM: {
-					in2 = new IndexedMatrixValue(kv._1, kv._2.get(1));
-					in3 = new IndexedMatrixValue(kv._1, kv._2.get(2));
-					expectedALSize(3, kv._2);
-					
-					if(in1==null || in2==null || in3 == null )
-						break;	
-					else
-						OperationsOnMatrixValues.performCtable(in1.getIndexes(), in1.getValue(), in2.getIndexes(),
-							in2.getValue(), in3.getIndexes(), in3.getValue(), ctableResult, ctableResultBlock, optr);
-					break;
-				}
-				case CTABLE_TRANSFORM_SCALAR_WEIGHT: 
-				case CTABLE_EXPAND_SCALAR_WEIGHT:
-				{
-					// 3rd input is a scalar
-					in2 = new IndexedMatrixValue(kv._1, kv._2.get(1));
-					expectedALSize(2, kv._2);
-					if(in1==null || in2==null )
+			//local aggregation of entire partition
+			while( arg0.hasNext() ) {
+				Tuple2<MatrixIndexes,MatrixBlock[]> tmp = arg0.next();
+				MatrixIndexes ix = tmp._1();
+				MatrixBlock[] mb = tmp._2();
+				
+				switch( _ctableOp ) {
+					case CTABLE_TRANSFORM: {
+						OperationsOnMatrixValues.performCtable(ix, mb[0], ix,
+							mb[1], ix, mb[2], map, block, null);
 						break;
-					else
-						matBlock1.ctableOperations((SimpleOperator)optr, kv._2.get(1), scalar_input3, ignoreZeros, ctableResult, ctableResultBlock);
+					}
+					case CTABLE_EXPAND_SCALAR_WEIGHT:
+					case CTABLE_TRANSFORM_SCALAR_WEIGHT: {
+						// 3rd input is a scalar
+						mb[0].ctableOperations(null, mb[1], _scalar_input3, _ignoreZeros, map, block);
 						break;
-				}
-				case CTABLE_TRANSFORM_HISTOGRAM: {
-					expectedALSize(1, kv._2);
-					OperationsOnMatrixValues.performCtable(in1.getIndexes(), in1.getValue(), scalar_input2, 
-							scalar_input3, ctableResult, ctableResultBlock, optr);
-					break;
-				}
-				case CTABLE_TRANSFORM_WEIGHTED_HISTOGRAM: {
-					// 2nd and 3rd inputs are scalars
-					expectedALSize(2, kv._2);
-					in3 = new IndexedMatrixValue(kv._1, kv._2.get(1)); // Note: kv._2.get(1), not kv._2.get(2)
-					
-					if(in1==null || in3==null)
+					}
+					case CTABLE_TRANSFORM_HISTOGRAM: {
+						OperationsOnMatrixValues.performCtable(ix, mb[0],
+							_scalar_input2, _scalar_input3, map, block, null);
 						break;
-					else
-						OperationsOnMatrixValues.performCtable(in1.getIndexes(), in1.getValue(), scalar_input2, 
-								in3.getIndexes(), in3.getValue(), ctableResult, ctableResultBlock, optr);		
-					break;
+					}
+					case CTABLE_TRANSFORM_WEIGHTED_HISTOGRAM: {
+						// 2nd and 3rd inputs are scalars
+						OperationsOnMatrixValues.performCtable(ix, mb[0],
+							_scalar_input2, ix, mb[1], map, block, null);
+						break;
+					}
+					default:
+						break;
+				}
+			}
+			
+			ReblockBuffer rbuff = new ReblockBuffer(Math.min(
+				4*1024*1024, map.size()), _dim1, _dim2, _brlen, _bclen);
+			ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new ArrayList<>();
+			
+			//append to buffer for blocked output
+			Iterator<ADoubleEntry> iter = map.getIterator();
+			while( iter.hasNext() ) {
+				ADoubleEntry e = iter.next();
+				if( e.getKey1() <= _dim1 && e.getKey2() <= _dim2 ) { 
+					if( rbuff.getSize() >= rbuff.getCapacity() )
+						flushBufferToList(rbuff, ret);
+					rbuff.appendCell(e.getKey1(), e.getKey2(), e.value);
 				}
-				default:
-					throw new DMLRuntimeException("Unrecognized opcode in Tertiary Instruction: " + instString);
 			}
-			return new Tuple2<>(kv._1, ctableResult);
+			
+			//final flush buffer
+			if( rbuff.getSize() > 0 )
+				flushBufferToList(rbuff, ret);
+			
+			return ret.iterator();
 		}
-		
-	}
 	
-	private static class MapMBIntoAL implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, ArrayList<MatrixBlock>> {
-
-		private static final long serialVersionUID = 2068398913653350125L;
-
-		@Override
-		public Tuple2<MatrixIndexes, ArrayList<MatrixBlock>> call(
-				Tuple2<MatrixIndexes, MatrixBlock> kv) throws Exception {
-			ArrayList<MatrixBlock> retVal = new ArrayList<>();
-			retVal.add(kv._2);
-			return new Tuple2<>(kv._1, retVal);
+		protected void flushBufferToList( ReblockBuffer rbuff,  ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret ) 
+			throws IOException, DMLRuntimeException
+		{
+			rbuff.flushBufferToBinaryBlocks().stream() // prevent library dependencies
+				.map(b -> SparkUtils.fromIndexedMatrixBlock(b)).forEach(b -> ret.add(b));
 		}
-		
 	}
 	
-	private static class ExtractBinaryCellsFromCTable implements PairFlatMapFunction<CTableMap, MatrixIndexes, Double> {
+	public static class MapJoinSignature1 implements Function<MatrixBlock, MatrixBlock[]> {
+		private static final long serialVersionUID = -8819908424033945028L;
 
-		private static final long serialVersionUID = -5933677686766674444L;
-		
 		@Override
-		public Iterator<Tuple2<MatrixIndexes, Double>> call(CTableMap ctableMap)
-				throws Exception {
-			ArrayList<Tuple2<MatrixIndexes, Double>> retVal = new ArrayList<>();
-			Iterator<ADoubleEntry> iter = ctableMap.getIterator();
-			while( iter.hasNext() ) {
-				ADoubleEntry ijv = iter.next();
-				long i = ijv.getKey1();
-				long j =  ijv.getKey2();
-				double v =  ijv.value;
-				retVal.add(new Tuple2<>(new MatrixIndexes(i, j), v));
-			}
-			return retVal.iterator();
+		public MatrixBlock[] call(MatrixBlock v1) throws Exception {
+			return ArrayUtils.toArray(v1);
 		}
-		
 	}
 	
-	private static class ConvertToBinaryCell implements PairFunction<Tuple2<MatrixIndexes,Double>, MatrixIndexes, MatrixCell> {
-
-		private static final long serialVersionUID = 7481186480851982800L;
-		
+	public static class MapJoinSignature2 implements Function<Tuple2<MatrixBlock,MatrixBlock>, MatrixBlock[]> {
+		private static final long serialVersionUID = 7690448020081435520L;
 		@Override
-		public Tuple2<MatrixIndexes, MatrixCell> call(
-				Tuple2<MatrixIndexes, Double> kv) throws Exception {
-			
-			MatrixCell cell = new MatrixCell(kv._2().doubleValue());
-			return new Tuple2<>(kv._1(), cell);
+		public MatrixBlock[] call(Tuple2<MatrixBlock, MatrixBlock> v1) throws Exception {
+			return ArrayUtils.toArray(v1._1(), v1._2());
 		}
-		
 	}
 	
-	private static class FilterCells implements Function<Tuple2<MatrixIndexes,Double>, Boolean> {
-		private static final long serialVersionUID = 108448577697623247L;
-
-		long rlen; long clen;
-		public FilterCells(long rlen, long clen) {
-			this.rlen = rlen;
-			this.clen = clen;
-		}
-		
+	public static class MapJoinSignature3 implements Function<Tuple2<Tuple2<MatrixBlock,MatrixBlock>,MatrixBlock>, MatrixBlock[]> {
+		private static final long serialVersionUID = -5222678882354280164L;
 		@Override
-		public Boolean call(Tuple2<MatrixIndexes, Double> kv) throws Exception {
-			if(kv._1.getRowIndex() <= 0 || kv._1.getColumnIndex() <= 0) {
-				throw new Exception("Incorrect cell values in TernarySPInstruction:" + kv._1);
-			}
-			if(kv._1.getRowIndex() <= rlen && kv._1.getColumnIndex() <= clen) {
-				return true;
-			}
-			return false;
+		public MatrixBlock[] call(Tuple2<Tuple2<MatrixBlock, MatrixBlock>, MatrixBlock> v1) throws Exception {
+			return ArrayUtils.toArray(v1._1()._1(), v1._1()._2(), v1._2());
 		}
-		
 	}
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index 6e647ee..29cd567 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -65,7 +65,6 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixCell;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
-import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
 import org.apache.sysml.runtime.matrix.mapred.ReblockBuffer;
 import org.apache.sysml.runtime.util.DataConverter;
 import org.apache.sysml.runtime.util.FastStringTokenizer;
@@ -491,10 +490,8 @@ public class RDDConverterUtils
 		protected void flushBufferToList( ReblockBuffer rbuff,  ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret ) 
 			throws IOException, DMLRuntimeException
 		{
-			//temporary list of indexed matrix values to prevent library dependencies
-			ArrayList<IndexedMatrixValue> rettmp = new ArrayList<>();
-			rbuff.flushBufferToBinaryBlocks(rettmp);
-			ret.addAll(SparkUtils.fromIndexedMatrixBlock(rettmp));
+			rbuff.flushBufferToBinaryBlocks().stream() // prevent library dependencies
+				.map(b -> SparkUtils.fromIndexedMatrixBlock(b)).forEach(b -> ret.add(b));
 		}
 	}
 
@@ -574,11 +571,11 @@ public class RDDConverterUtils
 	/////////////////////////////////
 	// BINARYCELL-SPECIFIC FUNCTIONS
 
-	private static class BinaryCellToBinaryBlockFunction extends CellToBinaryBlockFunction implements PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes,MatrixCell>>,MatrixIndexes,MatrixBlock> 
+	public static class BinaryCellToBinaryBlockFunction extends CellToBinaryBlockFunction implements PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes,MatrixCell>>,MatrixIndexes,MatrixBlock> 
 	{
 		private static final long serialVersionUID = 3928810989462198243L;
 
-		protected BinaryCellToBinaryBlockFunction(MatrixCharacteristics mc) {
+		public BinaryCellToBinaryBlockFunction(MatrixCharacteristics mc) {
 			super(mc);
 		}
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
index 77800e4..4871aee 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
@@ -389,10 +389,8 @@ public class RDDConverterUtilsExt
 		private static void flushBufferToList( ReblockBuffer rbuff,  ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret )
 			throws IOException, DMLRuntimeException
 		{
-			//temporary list of indexed matrix values to prevent library dependencies
-			ArrayList<IndexedMatrixValue> rettmp = new ArrayList<IndexedMatrixValue>();
-			rbuff.flushBufferToBinaryBlocks(rettmp);
-			ret.addAll(SparkUtils.fromIndexedMatrixBlock(rettmp));
+			rbuff.flushBufferToBinaryBlocks().stream() // prevent library dependencies
+				.map(b -> SparkUtils.fromIndexedMatrixBlock(b)).forEach(b -> ret.add(b));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixPackedCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixPackedCell.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixPackedCell.java
index ecf44b6..e8d0316 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixPackedCell.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixPackedCell.java
@@ -73,7 +73,7 @@ public class MatrixPackedCell extends MatrixCell
 	//with corrections
 	@Override
 	public void incrementalAggregate(AggregateOperator aggOp, MatrixValue correction, 
-			MatrixValue newWithCorrection) {
+			MatrixValue newWithCorrection, boolean deep) {
 		incrementalAggregate(aggOp, newWithCorrection);
 	}
 	

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixValue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixValue.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixValue.java
index 82b09e0..88a918d 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixValue.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixValue.java
@@ -127,10 +127,6 @@ public abstract class MatrixValue implements WritableComparable
 	
 	public abstract MatrixValue unaryOperations(UnaryOperator op, MatrixValue result);
 	
-	public void incrementalAggregate(AggregateOperator aggOp, MatrixValue correction, MatrixValue newWithCorrection) {
-		incrementalAggregate(aggOp, correction, newWithCorrection, true);
-	}
-	
 	public abstract void incrementalAggregate(AggregateOperator aggOp, MatrixValue correction, MatrixValue newWithCorrection, boolean deep);
 	
 	public abstract void incrementalAggregate(AggregateOperator aggOp, MatrixValue newWithCorrection);

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java
index 7f273fb..8d6f2e6 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java
@@ -23,8 +23,10 @@ package org.apache.sysml.runtime.matrix.mapred;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -223,11 +225,11 @@ public class ReblockBuffer
 		_count = 0;
 	}
 
-	public void flushBufferToBinaryBlocks( ArrayList<IndexedMatrixValue> outList ) 
+	public List<IndexedMatrixValue> flushBufferToBinaryBlocks() 
 		throws IOException, DMLRuntimeException
 	{
 		if( _count == 0 )
-			return;
+			return Collections.emptyList();
 		
 		//Step 1) sort reblock buffer (blockwise, no in-block sorting!)
 		Arrays.sort( _buff, 0 ,_count, new ReblockBufferComparator() );
@@ -248,7 +250,8 @@ public class ReblockBuffer
 			}
 		}
 		
-		//Step 3) output blocks 
+		//Step 3) output blocks
+		ArrayList<IndexedMatrixValue> ret = new ArrayList<>();
 		boolean sparse = MatrixBlock.evalSparseFormatInMemory(_brlen, _bclen, _count/numBlocks);
 		MatrixIndexes tmpIx = new MatrixIndexes();
 		MatrixBlock tmpBlock = new MatrixBlock();
@@ -262,7 +265,7 @@ public class ReblockBuffer
 			
 			//output block and switch to next index pair
 			if( bi != cbi || bj != cbj ) {
-				outputBlock(outList, tmpIx, tmpBlock);
+				outputBlock(ret, tmpIx, tmpBlock);
 				cbi = bi;
 				cbj = bj;
 				tmpIx = new MatrixIndexes(bi, bj);
@@ -278,9 +281,9 @@ public class ReblockBuffer
 		}
 		
 		//output last block 
-		outputBlock(outList, tmpIx, tmpBlock);
-		
+		outputBlock(ret, tmpIx, tmpBlock);
 		_count = 0;
+		return ret;
 	}
 
 	private static void outputBlock( OutputCollector<Writable, Writable> out, MatrixIndexes key, TaggedAdaptivePartialBlock value, MatrixBlock block ) 

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index f0911bb..1947c00 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -22,8 +22,11 @@ package org.apache.sysml.runtime.util;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.sysml.parser.Expression.ValueType;
@@ -663,4 +666,9 @@ public class UtilFunctions
 			ret.add(element);
 		return ret;
 	}
+	
+	public static <T> Stream<T> getStream(Iterator<T> iter) {
+		Iterable<T> iterable = () -> iter;
+		return StreamSupport.stream(iterable.spliterator(), false);
+	}
 }