You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/04/15 06:53:20 UTC

[1/2] incubator-systemml git commit: [SYSTEMML-1525, 1526] Fix codegen size propagation and cplan cleanup

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 149562eca -> 0d625a05e


[SYSTEMML-1525,1526] Fix codegen size propagation and cplan cleanup

This patch fixes the output size propagation of row and multi-aggregate
templates as well as plan cleanups of multi-aggregate templates where
the output nodes themselves can be indexing operations.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/1cd62866
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/1cd62866
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/1cd62866

Branch: refs/heads/master
Commit: 1cd62866b7848bdb5eeded39259cd23b0ff172c8
Parents: 149562e
Author: Matthias Boehm <mb...@gmail.com>
Authored: Fri Apr 14 20:57:51 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Apr 14 20:57:51 2017 -0700

----------------------------------------------------------------------
 .../sysml/hops/codegen/SpoofCompiler.java       | 53 +++++++++++++-------
 .../apache/sysml/hops/codegen/SpoofFusedOp.java |  5 ++
 .../sysml/hops/codegen/cplan/CNodeMultiAgg.java |  2 +-
 .../sysml/hops/codegen/cplan/CNodeRow.java      | 11 ++--
 .../hops/codegen/template/TemplateUtils.java    | 13 ++++-
 5 files changed, 61 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1cd62866/src/main/java/org/apache/sysml/hops/codegen/SpoofCompiler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/codegen/SpoofCompiler.java b/src/main/java/org/apache/sysml/hops/codegen/SpoofCompiler.java
index 1f0644b..43b88b0 100644
--- a/src/main/java/org/apache/sysml/hops/codegen/SpoofCompiler.java
+++ b/src/main/java/org/apache/sysml/hops/codegen/SpoofCompiler.java
@@ -583,21 +583,10 @@ public class SpoofCompiler
 						tmp.toArray(new Hop[0]),tpl));
 			}
 			
-			//remove spurious lookups on main input of cell template
-			if( tpl instanceof CNodeCell || tpl instanceof CNodeOuterProduct ) {
-				CNodeData in1 = (CNodeData)tpl.getInput().get(0);
-				rFindAndRemoveLookup(tpl.getOutput(), in1);
-			}
-			else if( tpl instanceof CNodeMultiAgg ) {
-				CNodeData in1 = (CNodeData)tpl.getInput().get(0);
-				for( CNode output : ((CNodeMultiAgg)tpl).getOutputs() )
-					rFindAndRemoveLookup(output, in1);
-			}
-			
 			//remove invalid plans with column indexing on main input
 			if( tpl instanceof CNodeCell ) {
 				CNodeData in1 = (CNodeData)tpl.getInput().get(0);
-				if( rHasLookupRC1(tpl.getOutput(), in1) ) {
+				if( rHasLookupRC1(tpl.getOutput(), in1) || isLookupRC1(tpl.getOutput(), in1) ) {
 					cplans2.remove(e.getKey());
 					if( LOG.isTraceEnabled() )
 						LOG.trace("Removed cplan due to invalid rc1 indexing on main input.");
@@ -606,16 +595,26 @@ public class SpoofCompiler
 			else if( tpl instanceof CNodeMultiAgg ) {
 				CNodeData in1 = (CNodeData)tpl.getInput().get(0);
 				for( CNode output : ((CNodeMultiAgg)tpl).getOutputs() )
-					if( rHasLookupRC1(output, in1) ) {
+					if( rHasLookupRC1(output, in1) || isLookupRC1(output, in1) ) {
 						cplans2.remove(e.getKey());
 						if( LOG.isTraceEnabled() )
 							LOG.trace("Removed cplan due to invalid rc1 indexing on main input.");
 					}
 			}
 			
+			//remove spurious lookups on main input of cell template
+			if( tpl instanceof CNodeCell || tpl instanceof CNodeOuterProduct ) {
+				CNodeData in1 = (CNodeData)tpl.getInput().get(0);
+				rFindAndRemoveLookup(tpl.getOutput(), in1);
+			}
+			else if( tpl instanceof CNodeMultiAgg ) {
+				CNodeData in1 = (CNodeData)tpl.getInput().get(0);
+				rFindAndRemoveLookupMultiAgg((CNodeMultiAgg)tpl, in1);
+			}
+			
 			//remove cplan w/ single op and w/o agg
-			if( tpl instanceof CNodeCell && ((CNodeCell)tpl).getCellType()==CellType.NO_AGG
-				&& TemplateUtils.hasSingleOperation(tpl) ) 
+			if( tpl instanceof CNodeCell && ((((CNodeCell)tpl).getCellType()==CellType.NO_AGG
+				&& TemplateUtils.hasSingleOperation(tpl))|| TemplateUtils.hasNoOperation(tpl)) ) 
 				cplans2.remove(e.getKey());
 				
 			//remove cplan if empty
@@ -636,6 +635,20 @@ public class SpoofCompiler
 			rCollectLeafIDs(c, leafs);
 	}
 	
+	private static void rFindAndRemoveLookupMultiAgg(CNodeMultiAgg node, CNodeData mainInput) {
+		//process all outputs individually
+		for( CNode output : node.getOutputs() )
+			rFindAndRemoveLookup(output, mainInput);
+		
+		//handle special case, of lookup being itself the output node
+		for( int i=0; i < node.getOutputs().size(); i++) {
+			CNode tmp = node.getOutputs().get(i);
+			if( TemplateUtils.isLookup(tmp) && tmp.getInput().get(0) instanceof CNodeData
+				&& ((CNodeData)tmp.getInput().get(0)).getHopID()==mainInput.getHopID() )
+				node.getOutputs().set(i, tmp.getInput().get(0));
+		}
+	}
+	
 	private static void rFindAndRemoveLookup(CNode node, CNodeData mainInput) {
 		for( int i=0; i<node.getInput().size(); i++ ) {
 			CNode tmp = node.getInput().get(i);
@@ -653,9 +666,7 @@ public class SpoofCompiler
 		boolean ret = false;
 		for( int i=0; i<node.getInput().size() && !ret; i++ ) {
 			CNode tmp = node.getInput().get(i);
-			if( tmp instanceof CNodeTernary && ((CNodeTernary)tmp).getType()==TernaryType.LOOKUP_RC1 
-				&& tmp.getInput().get(0) instanceof CNodeData
-				&& ((CNodeData)tmp.getInput().get(0)).getHopID() == mainInput.getHopID())
+			if( isLookupRC1(tmp, mainInput) )
 				ret = true;
 			else
 				ret |= rHasLookupRC1(tmp, mainInput);
@@ -663,6 +674,12 @@ public class SpoofCompiler
 		return ret;
 	}
 	
+	private static boolean isLookupRC1(CNode node, CNodeData mainInput) {
+		return (node instanceof CNodeTernary && ((CNodeTernary)node).getType()==TernaryType.LOOKUP_RC1 
+				&& node.getInput().get(0) instanceof CNodeData
+				&& ((CNodeData)node.getInput().get(0)).getHopID() == mainInput.getHopID());
+	}
+	
 	/**
 	 * This plan cache maps CPlans to compiled and loaded classes in order
 	 * to reduce javac and JIT compilation overhead. It uses a simple LRU 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1cd62866/src/main/java/org/apache/sysml/hops/codegen/SpoofFusedOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/codegen/SpoofFusedOp.java b/src/main/java/org/apache/sysml/hops/codegen/SpoofFusedOp.java
index 357d41c..89d205b 100644
--- a/src/main/java/org/apache/sysml/hops/codegen/SpoofFusedOp.java
+++ b/src/main/java/org/apache/sysml/hops/codegen/SpoofFusedOp.java
@@ -41,6 +41,7 @@ public class SpoofFusedOp extends Hop implements MultiThreadedHop
 		COLUMN_DIMS_ROWS,
 		COLUMN_DIMS_COLS,
 		SCALAR,
+		MULTI_SCALAR,
 		ROW_RANK_DIMS, // right wdivmm 
 		COLUMN_RANK_DIMS  // left wdivmm
 	}
@@ -160,6 +161,10 @@ public class SpoofFusedOp extends Hop implements MultiThreadedHop
 				setDim1(0);
 				setDim2(0);
 				break;
+			case MULTI_SCALAR:
+				setDim1(1); //row vector
+				//dim2 statically set from outside
+				break;
 			case ROW_RANK_DIMS:
 				setDim1(getInput().get(0).getDim1());
 				setDim2(getInput().get(1).getDim2());

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1cd62866/src/main/java/org/apache/sysml/hops/codegen/cplan/CNodeMultiAgg.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/codegen/cplan/CNodeMultiAgg.java b/src/main/java/org/apache/sysml/hops/codegen/cplan/CNodeMultiAgg.java
index 95e1f75..aa84a00 100644
--- a/src/main/java/org/apache/sysml/hops/codegen/cplan/CNodeMultiAgg.java
+++ b/src/main/java/org/apache/sysml/hops/codegen/cplan/CNodeMultiAgg.java
@@ -137,7 +137,7 @@ public class CNodeMultiAgg extends CNodeTpl
 
 	@Override
 	public SpoofOutputDimsType getOutputDimType() {
-		return SpoofOutputDimsType.COLUMN_DIMS_COLS;  //row vector
+		return SpoofOutputDimsType.MULTI_SCALAR;
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1cd62866/src/main/java/org/apache/sysml/hops/codegen/cplan/CNodeRow.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/codegen/cplan/CNodeRow.java b/src/main/java/org/apache/sysml/hops/codegen/cplan/CNodeRow.java
index 3cc2e3b..ac2a394 100644
--- a/src/main/java/org/apache/sysml/hops/codegen/cplan/CNodeRow.java
+++ b/src/main/java/org/apache/sysml/hops/codegen/cplan/CNodeRow.java
@@ -123,9 +123,14 @@ public class CNodeRow extends CNodeTpl
 
 	@Override
 	public SpoofOutputDimsType getOutputDimType() {
-		return (_output._cols==1) ? 
-			SpoofOutputDimsType.COLUMN_DIMS_ROWS : //column vector
-			SpoofOutputDimsType.COLUMN_DIMS_COLS;  //row vector
+		switch( _type ) {
+			case NO_AGG: return SpoofOutputDimsType.INPUT_DIMS;
+			case ROW_AGG: return SpoofOutputDimsType.ROW_DIMS;
+			case COL_AGG: return SpoofOutputDimsType.COLUMN_DIMS_COLS; //row vector
+			case COL_AGG_T: return SpoofOutputDimsType.COLUMN_DIMS_ROWS; //column vector
+			default:
+				throw new RuntimeException("Unsupported row type: "+_type.toString());
+		}
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1cd62866/src/main/java/org/apache/sysml/hops/codegen/template/TemplateUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/codegen/template/TemplateUtils.java b/src/main/java/org/apache/sysml/hops/codegen/template/TemplateUtils.java
index 0a19b56..89211db 100644
--- a/src/main/java/org/apache/sysml/hops/codegen/template/TemplateUtils.java
+++ b/src/main/java/org/apache/sysml/hops/codegen/template/TemplateUtils.java
@@ -267,13 +267,19 @@ public class TemplateUtils
 	}
 	
 	public static boolean isLookup(CNode node) {
-		return isUnary(node, UnaryType.LOOKUP_R, UnaryType.LOOKUP_C, UnaryType.LOOKUP_RC);
+		return isUnary(node, UnaryType.LOOKUP_R, UnaryType.LOOKUP_C, UnaryType.LOOKUP_RC)
+			|| isTernary(node, TernaryType.LOOKUP_RC1);
 	}
 	
 	public static boolean isUnary(CNode node, UnaryType...types) {
 		return node instanceof CNodeUnary
 			&& ArrayUtils.contains(types, ((CNodeUnary)node).getType());
 	}
+	
+	public static boolean isTernary(CNode node, TernaryType...types) {
+		return node instanceof CNodeTernary
+			&& ArrayUtils.contains(types, ((CNodeTernary)node).getType());
+	}
 
 	public static CNodeData createCNodeData(Hop hop, boolean compileLiterals) {
 		CNodeData cdata = new CNodeData(hop);
@@ -312,6 +318,11 @@ public class TemplateUtils
 				|| output instanceof CNodeTernary) && hasOnlyDataNodeOrLookupInputs(output);
 	}
 	
+	public static boolean hasNoOperation(CNodeTpl tpl) {
+		return tpl.getOutput() instanceof CNodeData 
+			|| isLookup(tpl.getOutput());
+	}
+	
 	public static boolean hasOnlyDataNodeOrLookupInputs(CNode node) {
 		boolean ret = true;
 		for( CNode c : node.getInput() )


[2/2] incubator-systemml git commit: [SYSTEMML-1512] Performance spark rowwise codegen instructions

Posted by mb...@apache.org.
[SYSTEMML-1512] Performance spark rowwise codegen instructions

This patch makes the following incremental performance improvements to
the spark instruction for rowwise codegen templates:

(1) Partitioning-preserving operations for row-aggregates (row-wise
templates have a constraint of clen<=bclen, i.e., a single column block,
and hence row aggregates are always partitioning preserving)

(2) Reduced allocation of temporary row vectors (we now allocate
temporary row vectors once per partition instead of once per block)

(3) Incremental aggregation for column aggregates (we now incrementally
aggregate output vectors for column aggregates instead of allocating
them per block)

On a scenario of a 10M x 10K dense input matrix and 100 iterations of a
row-wise template with broadcasts and two vector intermediates, changes
(2) and (3) improved the total runtime from 253s to 204s.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/0d625a05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/0d625a05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/0d625a05

Branch: refs/heads/master
Commit: 0d625a05e44e8e613651be4bf9ae4b833c087ffd
Parents: 1cd6286
Author: Matthias Boehm <mb...@gmail.com>
Authored: Fri Apr 14 23:33:10 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Apr 14 23:33:10 2017 -0700

----------------------------------------------------------------------
 .../sysml/runtime/codegen/SpoofRowwise.java     | 24 +++++--
 .../instructions/spark/SpoofSPInstruction.java  | 66 +++++++++++++-------
 2 files changed, 61 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0d625a05/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
index b100a89..8b9fb4d 100644
--- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
+++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
@@ -61,6 +61,10 @@ public abstract class SpoofRowwise extends SpoofOperator
 	public RowType getRowType() {
 		return _type;
 	}
+	
+	public int getNumIntermediates() {
+		return _reqVectMem;
+	}
 
 	@Override
 	public String getSpoofType() {
@@ -69,7 +73,13 @@ public abstract class SpoofRowwise extends SpoofOperator
 	
 	@Override
 	public void execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalarObjects, MatrixBlock out)	
-		throws DMLRuntimeException
+		throws DMLRuntimeException 
+	{
+		execute(inputs, scalarObjects, out, true, false);
+	}
+	
+	public void execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalarObjects, MatrixBlock out, boolean allocTmp, boolean aggIncr) 
+		throws DMLRuntimeException	
 	{
 		//sanity check
 		if( inputs==null || inputs.size() < 1 || out==null )
@@ -78,23 +88,27 @@ public abstract class SpoofRowwise extends SpoofOperator
 		//result allocation and preparations
 		final int m = inputs.get(0).getNumRows();
 		final int n = inputs.get(0).getNumColumns();
-		allocateOutputMatrix(m, n, out);
+		if( !aggIncr || !out.isAllocated() )
+			allocateOutputMatrix(m, n, out);
 		double[] c = out.getDenseBlock();
 		
 		//input preparation
 		double[][] b = prepInputMatrices(inputs);
 		double[] scalars = prepInputScalars(scalarObjects);
 		
-		//core sequential execute
+		//setup thread-local memory if necessary
+		if( allocTmp )
+			LibSpoofPrimitives.setupThreadLocalMemory(_reqVectMem, n);
 		
-		LibSpoofPrimitives.setupThreadLocalMemory(_reqVectMem, n);
+		//core sequential execute
 		if( !inputs.get(0).isInSparseFormat() )
 			executeDense(inputs.get(0).getDenseBlock(), b, scalars, c, n, 0, m);
 		else
 			executeSparse(inputs.get(0).getSparseBlock(), b, scalars, c, n, 0, m);
 	
 		//post-processing
-		LibSpoofPrimitives.cleanupThreadLocalMemory();
+		if( allocTmp )
+			LibSpoofPrimitives.cleanupThreadLocalMemory();
 		out.recomputeNonZeros();
 		out.examSparsity();
 	}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0d625a05/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
index be8e849..be3a76d 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
@@ -31,6 +31,7 @@ import org.apache.sysml.lops.PartialAggregate.CorrectionLocationType;
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.codegen.CodegenUtils;
+import org.apache.sysml.runtime.codegen.LibSpoofPrimitives;
 import org.apache.sysml.runtime.codegen.SpoofCellwise;
 import org.apache.sysml.runtime.codegen.SpoofMultiAggregate;
 import org.apache.sysml.runtime.codegen.SpoofCellwise.AggOp;
@@ -198,16 +199,15 @@ public class SpoofSPInstruction extends SPInstruction
 		}
 		else if( _class.getSuperclass() == SpoofRowwise.class ) { //row aggregate operator
 			SpoofRowwise op = (SpoofRowwise) CodegenUtils.createInstance(_class); 	
-			RowwiseFunction fmmc = new RowwiseFunction(_class.getName(), _classBytes, bcMatrices, scalars);
+			RowwiseFunction fmmc = new RowwiseFunction(_class.getName(), _classBytes, bcMatrices, scalars, (int)mcIn.getCols());
+			out = in.mapPartitionsToPair(fmmc, op.getRowType()==RowType.ROW_AGG);
 			
 			if( op.getRowType().isColumnAgg() ) {
-				JavaPairRDD<MatrixIndexes,MatrixBlock> tmpRDD = in.mapToPair(fmmc);
-				MatrixBlock tmpMB = RDDAggregateUtils.sumStable(tmpRDD);		
+				MatrixBlock tmpMB = RDDAggregateUtils.sumStable(out);		
 				sec.setMatrixOutput(_out.getName(), tmpMB);
 			}
 			else //row-agg or no-agg 
 			{
-				out = in.mapToPair(fmmc);
 				if( op.getRowType()==RowType.ROW_AGG && mcIn.getCols() > mcIn.getColsPerBlock() ) {
 					//TODO investigate if some other side effect of correct blocks
 					if( out.partitions().size() > mcIn.getNumRowBlocks() )
@@ -275,27 +275,29 @@ public class SpoofSPInstruction extends SPInstruction
 		}
 	}
 		
-	private static class RowwiseFunction implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock> 
+	private static class RowwiseFunction implements PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes, MatrixBlock>>, MatrixIndexes, MatrixBlock> 
 	{
 		private static final long serialVersionUID = -7926980450209760212L;
 
-		private ArrayList<PartitionedBroadcast<MatrixBlock>> _vectors = null;
-		private ArrayList<ScalarObject> _scalars = null;
-		private byte[] _classBytes = null;
-		private String _className = null;
+		private final ArrayList<PartitionedBroadcast<MatrixBlock>> _vectors;
+		private final ArrayList<ScalarObject> _scalars;
+		private final byte[] _classBytes;
+		private final String _className;
+		private final int _clen;
 		private SpoofRowwise _op = null;
 		
-		public RowwiseFunction(String className, byte[] classBytes, ArrayList<PartitionedBroadcast<MatrixBlock>> bcMatrices, ArrayList<ScalarObject> scalars) 
+		public RowwiseFunction(String className, byte[] classBytes, ArrayList<PartitionedBroadcast<MatrixBlock>> bcMatrices, ArrayList<ScalarObject> scalars, int clen) 
 			throws DMLRuntimeException
 		{			
 			_className = className;
 			_classBytes = classBytes;
 			_vectors = bcMatrices;
 			_scalars = scalars;
+			_clen = clen;
 		}
 		
 		@Override
-		public Tuple2<MatrixIndexes, MatrixBlock> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 ) 
+		public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call( Iterator<Tuple2<MatrixIndexes, MatrixBlock>> arg ) 
 			throws Exception 
 		{
 			//lazy load of shipped class
@@ -304,21 +306,37 @@ public class SpoofSPInstruction extends SPInstruction
 				_op = (SpoofRowwise) CodegenUtils.createInstance(loadedClass); 
 			}
 			
-			//get main input block and indexes
-			MatrixIndexes ixIn = arg0._1();
-			MatrixBlock blkIn = arg0._2();
-			int rowIx = (int)ixIn.getRowIndex();
+			//setup local memory for reuse
+			LibSpoofPrimitives.setupThreadLocalMemory(_op.getNumIntermediates(), _clen);
 			
-			//prepare output and execute single-threaded operator
-			ArrayList<MatrixBlock> inputs = getVectorInputsFromBroadcast(blkIn, rowIx);
-			MatrixIndexes ixOut = new MatrixIndexes(
-				_op.getRowType().isColumnAgg() ? 1 : ixIn.getRowIndex(),
-				_op.getRowType()!=RowType.NO_AGG ? 1 : ixIn.getColumnIndex());
-			MatrixBlock blkOut = new MatrixBlock();
-			_op.execute(inputs, _scalars, blkOut);
+			ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>();
+			boolean aggIncr = _op.getRowType().isColumnAgg(); //aggregate entire partition to avoid allocations
+			MatrixBlock blkOut = aggIncr ? new MatrixBlock() : null;
 			
-			//output new tuple
-			return new Tuple2<MatrixIndexes, MatrixBlock>(ixOut, blkOut);
+			while( arg.hasNext() ) {
+				//get main input block and indexes
+				Tuple2<MatrixIndexes,MatrixBlock> e = arg.next();
+				MatrixIndexes ixIn = e._1();
+				MatrixBlock blkIn = e._2();
+				int rowIx = (int)ixIn.getRowIndex();
+				
+				//prepare output and execute single-threaded operator
+				ArrayList<MatrixBlock> inputs = getVectorInputsFromBroadcast(blkIn, rowIx);
+				blkOut = aggIncr ? blkOut : new MatrixBlock();
+				_op.execute(inputs, _scalars, blkOut, false, aggIncr);
+				if( !aggIncr ) {
+					MatrixIndexes ixOut = new MatrixIndexes(ixIn.getRowIndex(),
+						_op.getRowType()!=RowType.NO_AGG ? 1 : ixIn.getColumnIndex());
+					ret.add(new Tuple2<MatrixIndexes, MatrixBlock>(ixOut, blkOut));
+				}
+			}
+			
+			//cleanup and final result preparations
+			LibSpoofPrimitives.cleanupThreadLocalMemory();
+			if( aggIncr )
+				ret.add(new Tuple2<MatrixIndexes, MatrixBlock>(new MatrixIndexes(1,1), blkOut));
+			
+			return ret.iterator();
 		}
 		
 		private ArrayList<MatrixBlock> getVectorInputsFromBroadcast(MatrixBlock blkIn, int rowIndex)