You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/03/25 21:40:06 UTC

[2/5] incubator-systemml git commit: [SYSTEMML-396] New multi-threaded cumulative aggregates (e.g., cumsum)

[SYSTEMML-396] New multi-threaded cumulative aggregates (e.g., cumsum)

This patch introduces multi-threaded cumulative aggregates, which is
especially useful for cumsum because the used numerically stable kahan
plus is rather costly. However, the speedup is limited as a significant
fraction of time is spent in output allocation. In experiments over both
dense and sparse, we have seen speedups of 1.5x-3x, depending the used
JDK.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/1e8de49c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/1e8de49c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/1e8de49c

Branch: refs/heads/master
Commit: 1e8de49c6a690e1a130eeac47a6ad541bc33addb
Parents: 3d65cce
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Thu Mar 24 17:14:52 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Fri Mar 25 13:39:04 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/sysml/hops/UnaryOp.java     |  29 +-
 src/main/java/org/apache/sysml/lops/Unary.java  |  60 ++--
 .../parfor/opt/OptimizerRuleBased.java          |   5 +-
 .../runtime/instructions/InstructionUtils.java  |  44 +++
 .../cp/BuiltinUnaryCPInstruction.java           |  10 +-
 .../sysml/runtime/matrix/data/LibMatrixAgg.java | 306 ++++++++++++++++---
 .../sysml/runtime/matrix/data/MatrixBlock.java  |   5 +-
 .../runtime/matrix/operators/UnaryOperator.java |  14 +-
 8 files changed, 407 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1e8de49c/src/main/java/org/apache/sysml/hops/UnaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/UnaryOp.java b/src/main/java/org/apache/sysml/hops/UnaryOp.java
index fa0e401..e200691 100644
--- a/src/main/java/org/apache/sysml/hops/UnaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/UnaryOp.java
@@ -22,6 +22,7 @@ package org.apache.sysml.hops;
 import java.util.ArrayList;
 
 import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.hops.Hop.MultiThreadedHop;
 import org.apache.sysml.lops.Aggregate;
 import org.apache.sysml.lops.Aggregate.OperationTypes;
 import org.apache.sysml.lops.CombineUnary;
@@ -48,11 +49,12 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
  * 		Semantic: given a value, perform the operation (independent of other values)
  */
 
-public class UnaryOp extends Hop 
+public class UnaryOp extends Hop implements MultiThreadedHop
 {
-
 	private OpOp1 _op = null;
-
+	
+	private int _maxNumThreads = -1; //-1 for unlimited
+	
 	
 	private UnaryOp() {
 		//default constructor for clone
@@ -98,6 +100,16 @@ public class UnaryOp extends Hop
 	}
 
 	@Override
+	public void setMaxNumThreads( int k ) {
+		_maxNumThreads = k;
+	}
+	
+	@Override
+	public int getMaxNumThreads() {
+		return _maxNumThreads;
+	}
+	
+	@Override
 	public Lop constructLops()
 		throws HopsException, LopsException 
 	{		
@@ -150,8 +162,9 @@ public class UnaryOp extends Hop
 				}
 				else //default unary 
 				{
+					int k = isCumulativeUnaryOperation() ? OptimizerUtils.getConstrainedNumThreads( _maxNumThreads ) : 1;					
 					Unary unary1 = new Unary(input.constructLops(), HopsOpOp1LopsU.get(_op), 
-							                 getDataType(), getValueType(), et);
+							                 getDataType(), getValueType(), et, k);
 					setOutputDimensions(unary1);
 					setLineNumbers(unary1);
 					setLops(unary1);
@@ -398,8 +411,9 @@ public class UnaryOp extends Hop
 		}
 		
 		//in-memory cum sum (of partial aggregates)
-		if( TEMP.getOutputParameters().getNumRows()!=1 ){
-			Unary unary1 = new Unary( TEMP, HopsOpOp1LopsU.get(_op), DataType.MATRIX, ValueType.DOUBLE, ExecType.CP);
+		if( TEMP.getOutputParameters().getNumRows()!=1 ) {
+			int k = OptimizerUtils.getConstrainedNumThreads( _maxNumThreads );					
+			Unary unary1 = new Unary( TEMP, HopsOpOp1LopsU.get(_op), DataType.MATRIX, ValueType.DOUBLE, ExecType.CP, k);
 			unary1.getOutputParameters().setDimensions(TEMP.getOutputParameters().getNumRows(), clen, brlen, bclen, -1);
 			setLineNumbers(unary1);
 			TEMP = unary1;
@@ -473,7 +487,8 @@ public class UnaryOp extends Hop
 		
 		//in-memory cum sum (of partial aggregates)
 		if( TEMP.getOutputParameters().getNumRows()!=1 ){
-			Unary unary1 = new Unary( TEMP, HopsOpOp1LopsU.get(_op), DataType.MATRIX, ValueType.DOUBLE, ExecType.CP);
+			int k = OptimizerUtils.getConstrainedNumThreads( _maxNumThreads );					
+			Unary unary1 = new Unary( TEMP, HopsOpOp1LopsU.get(_op), DataType.MATRIX, ValueType.DOUBLE, ExecType.CP, k);
 			unary1.getOutputParameters().setDimensions(TEMP.getOutputParameters().getNumRows(), clen, brlen, bclen, -1);
 			setLineNumbers(unary1);
 			TEMP = unary1;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1e8de49c/src/main/java/org/apache/sysml/lops/Unary.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/Unary.java b/src/main/java/org/apache/sysml/lops/Unary.java
index 1f56d66..8ef3e7b 100644
--- a/src/main/java/org/apache/sysml/lops/Unary.java
+++ b/src/main/java/org/apache/sysml/lops/Unary.java
@@ -46,9 +46,12 @@ public class Unary extends Lop
 		NOTSUPPORTED
 	};
 
-	OperationTypes operation;
+	private OperationTypes operation;
+	private Lop valInput;
+	
+	//cp-specific parameters
+	private int _numThreads = 1;
 
-	Lop valInput;
 
 	/**
 	 * Constructor to perform a unary operation with 2 inputs
@@ -109,11 +112,12 @@ public class Unary extends Lop
 	 * @param op
 	 * @throws LopsException 
 	 */
-	public Unary(Lop input1, OperationTypes op, DataType dt, ValueType vt, ExecType et) 
+	public Unary(Lop input1, OperationTypes op, DataType dt, ValueType vt, ExecType et, int numThreads) 
 		throws LopsException 
 	{
 		super(Lop.Type.UNARY, dt, vt);
 		init(input1, op, dt, vt, et);
+		_numThreads = numThreads;
 	}
 	
 	public Unary(Lop input1, OperationTypes op, DataType dt, ValueType vt) 
@@ -327,28 +331,46 @@ public class Unary extends Lop
 					"Instruction not defined for Unary operation: " + op);
 		}
 	}
+	
+	/**
+	 * 
+	 * @param op
+	 * @return
+	 */
+	public static boolean isCumulativeOp(OperationTypes op) {
+		return op==OperationTypes.CUMSUM
+			|| op==OperationTypes.CUMPROD
+			|| op==OperationTypes.CUMMIN
+			|| op==OperationTypes.CUMMAX;
+	}
+	
+	@Override
 	public String getInstructions(String input1, String output) 
 		throws LopsException 
 	{
+		//sanity check number of operands
+		if( getInputs().size() != 1 ) {
+			throw new LopsException(printErrorLocation() + "Invalid number of operands ("
+					+ getInputs().size() + ") for an Unary opration: " + operation);		
+		}
+		
 		// Unary operators with one input
-		if (this.getInputs().size() == 1) {
-			
-			StringBuilder sb = new StringBuilder();
-			sb.append( getExecType() );
-			sb.append( Lop.OPERAND_DELIMITOR );
-			sb.append( getOpcode() );
-			sb.append( OPERAND_DELIMITOR );
-			sb.append( getInputs().get(0).prepInputOperand(input1));
+		StringBuilder sb = new StringBuilder();
+		sb.append( getExecType() );
+		sb.append( Lop.OPERAND_DELIMITOR );
+		sb.append( getOpcode() );
+		sb.append( OPERAND_DELIMITOR );
+		sb.append( getInputs().get(0).prepInputOperand(input1) );
+		sb.append( OPERAND_DELIMITOR );
+		sb.append( prepOutputOperand(output) );
+		
+		//num threads for cumulative cp ops
+		if( getExecType() == ExecType.CP && isCumulativeOp(operation) ) {
 			sb.append( OPERAND_DELIMITOR );
-			sb.append( this.prepOutputOperand(output));
-			
-			return sb.toString();
-
-		} else {
-			throw new LopsException(this.printErrorLocation() + "Invalid number of operands ("
-					+ this.getInputs().size() + ") for an Unary opration: "
-					+ operation);
+			sb.append( _numThreads );
 		}
+		
+		return sb.toString();
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1e8de49c/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index 98fb6bb..ef6aab9 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -51,6 +51,7 @@ import org.apache.sysml.hops.LiteralOp;
 import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.hops.ParameterizedBuiltinOp;
 import org.apache.sysml.hops.ReorgOp;
+import org.apache.sysml.hops.UnaryOp;
 import org.apache.sysml.hops.rewrite.HopRewriteUtils;
 import org.apache.sysml.hops.rewrite.ProgramRewriteStatus;
 import org.apache.sysml.hops.rewrite.ProgramRewriter;
@@ -1538,7 +1539,9 @@ public class OptimizerRuleBased extends Optimizer
 					if(    ConfigurationManager.isParallelMatrixOperations() 
 						&& h instanceof MultiThreadedHop //abop, datagenop, qop, paramop
 						&& !( h instanceof ParameterizedBuiltinOp //only paramop-grpagg
-							 && ((ParameterizedBuiltinOp)h).getOp()!=ParamBuiltinOp.GROUPEDAGG) )
+							 && ((ParameterizedBuiltinOp)h).getOp()!=ParamBuiltinOp.GROUPEDAGG)
+						&& !( h instanceof UnaryOp //only unaryop-cumulativeagg
+							 && !((UnaryOp)h).isCumulativeUnaryOperation() )      )
 					{
 						MultiThreadedHop mhop = (MultiThreadedHop) h;
 						mhop.setMaxNumThreads(opsK); //set max constraint in hop

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1e8de49c/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
index 6c5ec95..ae0fd30 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
@@ -43,6 +43,7 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.DMLUnsupportedOperationException;
 import org.apache.sysml.runtime.functionobjects.And;
 import org.apache.sysml.runtime.functionobjects.Builtin;
+import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinFunctionCode;
 import org.apache.sysml.runtime.functionobjects.CM;
 import org.apache.sysml.runtime.functionobjects.Divide;
 import org.apache.sysml.runtime.functionobjects.Equals;
@@ -79,6 +80,7 @@ import org.apache.sysml.runtime.matrix.operators.LeftScalarOperator;
 import org.apache.sysml.runtime.matrix.operators.RightScalarOperator;
 import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
 import org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTypes;
+import org.apache.sysml.runtime.matrix.operators.UnaryOperator;
 
 
 public class InstructionUtils 
@@ -505,6 +507,48 @@ public class InstructionUtils
 	
 	/**
 	 * 
+	 * @param uop
+	 * @return
+	 */
+	public static AggregateUnaryOperator parseCumulativeAggregateUnaryOperator(UnaryOperator uop)
+	{
+		Builtin f = (Builtin)uop.fn;
+		
+		if( f.getBuiltinFunctionCode()==BuiltinFunctionCode.CUMSUM ) 
+			return parseCumulativeAggregateUnaryOperator("ucumack+") ;
+		else if( f.getBuiltinFunctionCode()==BuiltinFunctionCode.CUMPROD ) 
+			return parseCumulativeAggregateUnaryOperator("ucumac*") ;
+		else if( f.getBuiltinFunctionCode()==BuiltinFunctionCode.CUMMIN ) 
+			return parseCumulativeAggregateUnaryOperator("ucumacmin") ;
+		else if( f.getBuiltinFunctionCode()==BuiltinFunctionCode.CUMMAX ) 
+			return parseCumulativeAggregateUnaryOperator("ucumacmax" ) ;
+		
+		throw new RuntimeException("Unsupported cumulative aggregate unary operator: "+f.getBuiltinFunctionCode());
+	}
+	
+	/**
+	 * 
+	 * @param uop
+	 * @return
+	 */
+	public static AggregateUnaryOperator parseBasicCumulativeAggregateUnaryOperator(UnaryOperator uop)
+	{
+		Builtin f = (Builtin)uop.fn;
+		
+		if( f.getBuiltinFunctionCode()==BuiltinFunctionCode.CUMSUM ) 
+			return parseBasicAggregateUnaryOperator("uack+") ;
+		else if( f.getBuiltinFunctionCode()==BuiltinFunctionCode.CUMPROD ) 
+			return parseCumulativeAggregateUnaryOperator("uac*") ;
+		else if( f.getBuiltinFunctionCode()==BuiltinFunctionCode.CUMMIN ) 
+			return parseCumulativeAggregateUnaryOperator("uacmin") ;
+		else if( f.getBuiltinFunctionCode()==BuiltinFunctionCode.CUMMAX ) 
+			return parseCumulativeAggregateUnaryOperator("uacmax" ) ;
+		
+		throw new RuntimeException("Unsupported cumulative aggregate unary operator: "+f.getBuiltinFunctionCode());
+	}
+	
+	/**
+	 * 
 	 * @param opcode
 	 * @return
 	 */

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1e8de49c/src/main/java/org/apache/sysml/runtime/instructions/cp/BuiltinUnaryCPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/BuiltinUnaryCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/BuiltinUnaryCPInstruction.java
index 2980ef1..2e5bcdf 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/BuiltinUnaryCPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/BuiltinUnaryCPInstruction.java
@@ -19,6 +19,8 @@
 
 package org.apache.sysml.runtime.instructions.cp;
 
+import java.util.Arrays;
+
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
@@ -57,14 +59,18 @@ public abstract class BuiltinUnaryCPInstruction extends UnaryCPInstruction
 		String opcode = null;
 		ValueFunction func = null;
 		
-		if( parts.length==4 ) //print or stop
+		//print or stop or cumulative aggregates
+		if( parts.length==4 ) 
 		{
 			opcode = parts[0];
 			in.split(parts[1]);
 			out.split(parts[2]);
 			func = Builtin.getBuiltinFnObject(opcode);
 			
-			return new ScalarBuiltinCPInstruction(new SimpleOperator(func), in, out, opcode, str);
+			if( Arrays.asList(new String[]{"ucumk+","ucum*","ucummin","ucummax"}).contains(opcode) )
+				return new MatrixBuiltinCPInstruction(new UnaryOperator(func,Integer.parseInt(parts[3])), in, out, opcode, str); 
+			else
+				return new ScalarBuiltinCPInstruction(new SimpleOperator(func), in, out, opcode, str);
 		}
 		else //2+1, general case
 		{

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1e8de49c/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
index 2b90ca8..e965b7e 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
@@ -21,9 +21,11 @@ package org.apache.sysml.runtime.matrix.data;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.sysml.lops.PartialAggregate.CorrectionLocationType;
 import org.apache.sysml.runtime.DMLRuntimeException;
@@ -42,6 +44,7 @@ import org.apache.sysml.runtime.functionobjects.ReduceCol;
 import org.apache.sysml.runtime.functionobjects.ReduceDiag;
 import org.apache.sysml.runtime.functionobjects.ReduceRow;
 import org.apache.sysml.runtime.functionobjects.ValueFunction;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CM_COV_Object;
 import org.apache.sysml.runtime.instructions.cp.KahanObject;
 import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
@@ -50,8 +53,10 @@ import org.apache.sysml.runtime.matrix.operators.CMOperator;
 import org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTypes;
 import org.apache.sysml.runtime.matrix.operators.Operator;
 import org.apache.sysml.runtime.matrix.operators.UnaryOperator;
+import org.apache.sysml.runtime.util.DataConverter;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
+
 /**
  * MB:
  * Library for matrix aggregations including ak+, uak+ for all
@@ -288,7 +293,7 @@ public class LibMatrixAgg
 	 * @param uop
 	 * @throws DMLRuntimeException
 	 */
-	public static void aggregateUnaryMatrix(MatrixBlock in, MatrixBlock out, UnaryOperator uop) 
+	public static MatrixBlock cumaggregateUnaryMatrix(MatrixBlock in, MatrixBlock out, UnaryOperator uop) 
 		throws DMLRuntimeException
 	{
 		//prepare meta data 
@@ -299,8 +304,7 @@ public class LibMatrixAgg
 		
 		//filter empty input blocks (incl special handling for sparse-unsafe operations)
 		if( in.isEmptyBlock(false) ){
-			aggregateUnaryMatrixEmpty(in, out, aggtype, null);
-			return;
+			return aggregateUnaryMatrixEmpty(in, out, aggtype, null);
 		}	
 		
 		//allocate output arrays (if required)
@@ -310,15 +314,106 @@ public class LibMatrixAgg
 		//Timing time = new Timing(true);
 		
 		if( !in.sparse )
-			aggregateUnaryMatrixDense(in, out, aggtype, uop.fn, null, 0, m);
+			cumaggregateUnaryMatrixDense(in, out, aggtype, uop.fn, null, 0, m);
 		else
-			aggregateUnaryMatrixSparse(in, out, aggtype, uop.fn, null, 0, m);
+			cumaggregateUnaryMatrixSparse(in, out, aggtype, uop.fn, null, 0, m);
 		
 		//cleanup output and change representation (if necessary)
 		out.recomputeNonZeros();
 		out.examSparsity();
 		
 		//System.out.println("uop ("+in.rlen+","+in.clen+","+in.sparse+") in "+time.stop()+"ms.");
+		
+		return out;
+	}
+	
+	/**
+	 * 
+	 * @param in
+	 * @param out
+	 * @param uop
+	 * @param k
+	 * @return
+	 * @throws DMLRuntimeException
+	 */
+	public static MatrixBlock cumaggregateUnaryMatrix(MatrixBlock in, MatrixBlock out, UnaryOperator uop, int k) 
+		throws DMLRuntimeException
+	{
+		//fall back to sequential version if necessary
+		if(    k <= 1 || (long)in.rlen*in.clen < PAR_NUMCELL_THRESHOLD || in.rlen <= k
+			|| out.clen*8*k > PAR_INTERMEDIATE_SIZE_THRESHOLD ) {
+			return cumaggregateUnaryMatrix(in, out, uop);
+		}
+		
+		//prepare meta data 
+		AggType aggtype = getAggType(uop);
+		final int m = in.rlen;
+		final int m2 = out.rlen;
+		final int n2 = out.clen;
+		final int mk = aggtype==AggType.CUM_KAHAN_SUM?2:1;
+		
+		//filter empty input blocks (incl special handling for sparse-unsafe operations)
+		if( in.isEmptyBlock(false) ){
+			return aggregateUnaryMatrixEmpty(in, out, aggtype, null);
+		}	
+
+		//Timing time = new Timing(true);
+		
+		//allocate output arrays (if required)
+		out.reset(m2, n2, false); //always dense
+		out.allocateDenseBlock();
+		
+		//core multi-threaded unary aggregate computation
+		//(currently: always parallelization over number of rows)
+		try {
+			ExecutorService pool = Executors.newFixedThreadPool( k );
+			int blklen = (int)(Math.ceil((double)m/k));
+			
+			//step 1: compute aggregates per row partition
+			AggregateUnaryOperator uaop = InstructionUtils.parseBasicCumulativeAggregateUnaryOperator(uop);
+			AggType uaoptype = getAggType(uaop);
+			ArrayList<PartialAggTask> tasks = new ArrayList<PartialAggTask>();
+			for( int i=0; i<k & i*blklen<m; i++ )
+				tasks.add( new PartialAggTask(in, new MatrixBlock(mk,n2,false), uaoptype, uaop, i*blklen, Math.min((i+1)*blklen, m)) );
+			List<Future<Object>> taskret = pool.invokeAll(tasks);	
+			for( Future<Object> task : taskret )
+				task.get(); //check for errors
+			
+			//step 2: cumulative aggregate of partition aggregates
+			MatrixBlock tmp = new MatrixBlock(tasks.size(), n2, false);
+			for( int i=0; i<tasks.size(); i++ ) {
+				MatrixBlock row = tasks.get(i).getResult();
+				if( uaop.aggOp.correctionExists )
+					row.dropLastRowsOrColums(uaop.aggOp.correctionLocation);
+				tmp.leftIndexingOperations(row, i, i, 0, n2-1, tmp, true);
+			}
+			MatrixBlock tmp2 = cumaggregateUnaryMatrix(tmp, new MatrixBlock(tasks.size(), n2, false), uop);
+			
+			//step 3: compute final cumulative aggregate
+			ArrayList<CumAggTask> tasks2 = new ArrayList<CumAggTask>();
+			for( int i=0; i<k & i*blklen<m; i++ ) {
+				double[] agg = (i==0)? new double[n2] : 
+					DataConverter.convertToDoubleVector(tmp2.sliceOperations(i-1, i-1, 0, n2-1, new MatrixBlock()));
+				tasks2.add( new CumAggTask(in, agg, out, aggtype, uop, i*blklen, Math.min((i+1)*blklen, m)) );
+			}
+			List<Future<Long>> taskret2 = pool.invokeAll(tasks2);	
+			pool.shutdown();
+			
+			//step 4: aggregate nnz
+			out.nonZeros = 0; 
+			for( Future<Long> task : taskret2 )
+				out.nonZeros += task.get();
+		}
+		catch(Exception ex) {
+			throw new DMLRuntimeException(ex);
+		}
+		
+		//cleanup output and change representation (if necessary)
+		out.examSparsity();
+		
+		//System.out.println("uop k="+k+" ("+in.rlen+","+in.clen+","+in.sparse+") in "+time.stop()+"ms.");
+		
+		return out;
 	}
 	
 	/**
@@ -1360,19 +1455,19 @@ public class LibMatrixAgg
 			{
 				KahanObject kbuff = new KahanObject(0, 0);
 				KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
-				d_ucumkp(a, c, m, n, kbuff, kplus);
+				d_ucumkp(a, null, c, m, n, kbuff, kplus, rl, ru);
 				break;
 			}
 			case CUM_PROD: //CUMPROD
 			{
-				d_ucumm(a, c, m, n);
+				d_ucumm(a, null, c, m, n, rl, ru);
 				break;
 			}
 			case CUM_MIN:
 			case CUM_MAX:
 			{
 				double init = Double.MAX_VALUE * ((optype==AggType.CUM_MAX)?-1:1);
-				d_ucummxx(a, c, m, n, init, (Builtin)vFn);
+				d_ucummxx(a, null, c, m, n, init, (Builtin)vFn, rl, ru);
 				break;
 			}
 			case MIN: 
@@ -1490,19 +1585,19 @@ public class LibMatrixAgg
 			{
 				KahanObject kbuff = new KahanObject(0, 0);
 				KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
-				s_ucumkp(a, c, m, n, kbuff, kplus);
+				s_ucumkp(a, null, c, m, n, kbuff, kplus, rl, ru);
 				break;
 			}
 			case CUM_PROD: //CUMPROD
 			{
-				s_ucumm(a, c, m, n);
+				s_ucumm(a, null, c, m, n, rl, ru);
 				break;
 			}
 			case CUM_MIN:
 			case CUM_MAX:
 			{
 				double init = Double.MAX_VALUE * ((optype==AggType.CUM_MAX)?-1:1);
-				s_ucummxx(a, c, m, n, init, (Builtin)vFn);
+				s_ucummxx(a, null, c, m, n, init, (Builtin)vFn, rl, ru);
 				break;
 			}
 			case MIN:
@@ -1569,6 +1664,100 @@ public class LibMatrixAgg
 				throw new DMLRuntimeException("Unsupported aggregation type: "+optype);
 		}
 	}
+	
+	/**
+	 * 
+	 * @param in
+	 * @param out
+	 * @param optype
+	 * @param vFn
+	 * @param agg
+	 * @param rl
+	 * @param ru
+	 * @throws DMLRuntimeException
+	 */
+	private static void cumaggregateUnaryMatrixDense(MatrixBlock in, MatrixBlock out, AggType optype, ValueFunction vFn, double[] agg, int rl, int ru) 
+			throws DMLRuntimeException
+	{
+		final int m = in.rlen;
+		final int n = in.clen;
+		
+		double[] a = in.getDenseBlock();
+		double[] c = out.getDenseBlock();		
+		
+		switch( optype )
+		{
+			case CUM_KAHAN_SUM: //CUMSUM
+			{
+				KahanObject kbuff = new KahanObject(0, 0);
+				KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
+				d_ucumkp(a, agg, c, m, n, kbuff, kplus, rl, ru);
+				break;
+			}
+			case CUM_PROD: //CUMPROD
+			{
+				d_ucumm(a, agg, c, m, n, rl, ru);
+				break;
+			}
+			case CUM_MIN:
+			case CUM_MAX:
+			{
+				double init = Double.MAX_VALUE * ((optype==AggType.CUM_MAX)?-1:1);
+				d_ucummxx(a, agg, c, m, n, init, (Builtin)vFn, rl, ru);
+				break;
+			}
+			
+			default:
+				throw new DMLRuntimeException("Unsupported cumulative aggregation type: "+optype);
+		}
+	}
+	
+	/**
+	 * 
+	 * @param in
+	 * @param out
+	 * @param optype
+	 * @param vFn
+	 * @param ixFn
+	 * @param rl
+	 * @param ru
+	 * @throws DMLRuntimeException
+	 */
+	private static void cumaggregateUnaryMatrixSparse(MatrixBlock in, MatrixBlock out, AggType optype, ValueFunction vFn, double[] agg, int rl, int ru) 
+			throws DMLRuntimeException
+	{
+		final int m = in.rlen;
+		final int n = in.clen;
+		
+		SparseBlock a = in.getSparseBlock();
+		double[] c = out.getDenseBlock();
+		
+		switch( optype )
+		{
+			case CUM_KAHAN_SUM: //CUMSUM
+			{
+				KahanObject kbuff = new KahanObject(0, 0);
+				KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
+				s_ucumkp(a, agg, c, m, n, kbuff, kplus, rl, ru);
+				break;
+			}
+			case CUM_PROD: //CUMPROD
+			{
+				s_ucumm(a, agg, c, m, n, rl, ru);
+				break;
+			}
+			case CUM_MIN:
+			case CUM_MAX:
+			{
+				double init = Double.MAX_VALUE * ((optype==AggType.CUM_MAX)?-1:1);
+				s_ucummxx(a, agg, c, m, n, init, (Builtin)vFn, rl, ru);
+				break;
+			}
+
+			default:
+				throw new DMLRuntimeException("Unsupported cumulative aggregation type: "+optype);
+		}
+	}
 
 	/**
 	 * 
@@ -1578,7 +1767,7 @@ public class LibMatrixAgg
 	 * @param ixFn
 	 * @throws DMLRuntimeException 
 	 */
-	private static void aggregateUnaryMatrixEmpty(MatrixBlock in, MatrixBlock out, AggType optype, IndexFunction ixFn) 
+	private static MatrixBlock aggregateUnaryMatrixEmpty(MatrixBlock in, MatrixBlock out, AggType optype, IndexFunction ixFn) 
 		throws DMLRuntimeException
 	{
 		//do nothing for pseudo sparse-safe operations
@@ -1587,7 +1776,7 @@ public class LibMatrixAgg
 				|| optype == AggType.CUM_KAHAN_SUM || optype == AggType.CUM_PROD
 				|| optype == AggType.CUM_MIN || optype == AggType.CUM_MAX)
 		{
-			return;
+			return out;
 		}
 		
 		//compute result based on meta data only
@@ -1639,6 +1828,8 @@ public class LibMatrixAgg
 			default:
 				throw new DMLRuntimeException("Unsupported aggregation type: "+optype);
 		}
+		
+		return out;
 	}
 	
 	
@@ -1785,13 +1976,15 @@ public class LibMatrixAgg
 	 * @param kbuff
 	 * @param kplus
 	 */
-	private static void d_ucumkp( double[] a, double[] c, int m, int n, KahanObject kbuff, KahanPlus kplus ) 
+	private static void d_ucumkp( double[] a, double[] agg, double[] c, int m, int n, KahanObject kbuff, KahanPlus kplus, int rl, int ru ) 
 	{
 		//init current row sum/correction arrays w/ neutral 0
-		double[] csums = new double[ 2*n ]; 
+		double[] csums = new double[ 2*n ];
+		if( agg != null )
+			System.arraycopy(agg, 0, csums, 0, n);
 
 		//scan once and compute prefix sums
-		for( int i=0, aix=0; i<m; i++, aix+=n ) {
+		for( int i=rl, aix=rl*n; i<ru; i++, aix+=n ) {
 			sumAgg( a, csums, aix, 0, n, kbuff, kplus );
 			System.arraycopy(csums, 0, c, aix, n);	
 		}
@@ -1807,14 +2000,15 @@ public class LibMatrixAgg
 	 * @param kbuff
 	 * @param kplus
 	 */
-	private static void d_ucumm( double[] a, double[] c, int m, int n ) 
+	private static void d_ucumm( double[] a, double[] agg, double[] c, int m, int n, int rl, int ru ) 
 	{	
 		//init current row product array w/ neutral 1
-		double[] cprods = new double[ n ]; 
-		Arrays.fill(cprods, 1);
+		double[] cprods = (agg!=null) ? agg : new double[ n ]; 
+		if( agg == null )
+			Arrays.fill(cprods, 1);
 		
 		//scan once and compute prefix products
-		for( int i=0, aix=0; i<m; i++, aix+=n ) {
+		for( int i=rl, aix=rl*n; i<ru; i++, aix+=n ) {
 			productAgg( a, cprods, aix, 0, n );
 			System.arraycopy(cprods, 0, c, aix, n);
 		}			
@@ -1829,14 +2023,15 @@ public class LibMatrixAgg
 	 * @param n
 	 * @param builtin
 	 */
-	private static void d_ucummxx( double[] a, double[] c, int m, int n, double init, Builtin builtin )
+	private static void d_ucummxx( double[] a, double[] agg, double[] c, int m, int n, double init, Builtin builtin, int rl, int ru )
 	{
 		//init current row min/max array w/ extreme value 
-		double[] cmxx = new double[ n ]; 
-		Arrays.fill(cmxx, init);
+		double[] cmxx = (agg!=null) ? agg : new double[ n ]; 
+		if( agg == null )
+			Arrays.fill(cmxx, init);
 				
 		//scan once and compute prefix min/max
-		for( int i=0, aix=0; i<m; i++, aix+=n ) {
+		for( int i=rl, aix=rl*n; i<ru; i++, aix+=n ) {
 			builtinAgg( a, cmxx, aix, n, builtin );
 			System.arraycopy(cmxx, 0, c, aix, n);
 		}
@@ -2279,13 +2474,15 @@ public class LibMatrixAgg
 	 * @param kbuff
 	 * @param kplus
 	 */
-	private static void s_ucumkp( SparseBlock a, double[] c, int m, int n, KahanObject kbuff, KahanPlus kplus )
+	private static void s_ucumkp( SparseBlock a, double[] agg, double[] c, int m, int n, KahanObject kbuff, KahanPlus kplus, int rl, int ru )
 	{
 		//init current row sum/correction arrays w/ neutral 0
 		double[] csums = new double[ 2*n ]; 
-
+		if( agg != null )
+			System.arraycopy(agg, 0, csums, 0, n);
+		
 		//scan once and compute prefix sums
-		for( int i=0, ix=0; i<m; i++, ix+=n ) {
+		for( int i=rl, ix=rl*n; i<ru; i++, ix+=n ) {
 			if( !a.isEmpty(i) )
 				sumAgg( a.values(i), csums, a.indexes(i), a.pos(i), a.size(i), n, kbuff, kplus );
 
@@ -2302,17 +2499,18 @@ public class LibMatrixAgg
 	 * @param m
 	 * @param n
 	 */
-	private static void s_ucumm( SparseBlock a, double[] c, int m, int n )
+	private static void s_ucumm( SparseBlock a, double[] agg, double[] c, int m, int n, int rl, int ru )
 	{
 		//init current row prod arrays w/ neutral 1
-		double[] cprod = new double[ n ]; 
-		Arrays.fill(cprod, 1);
+		double[] cprod = (agg!=null) ? agg : new double[ n ]; 
+		if( agg == null )
+			Arrays.fill(cprod, 1);
 		
 		//init count arrays (helper, see correction)
 		int[] cnt = new int[ n ]; 
 
 		//scan once and compute prefix products
-		for( int i=0, ix=0; i<m; i++, ix+=n )
+		for( int i=rl, ix=rl*n; i<ru; i++, ix+=n )
 		{
 			//multiply row of non-zero elements
 			if( !a.isEmpty(i) ) {
@@ -2345,11 +2543,12 @@ public class LibMatrixAgg
 	 * @param init
 	 * @param builtin
 	 */
-	private static void s_ucummxx( SparseBlock a, double[] c, int m, int n, double init, Builtin builtin ) 
+	private static void s_ucummxx( SparseBlock a, double[] agg, double[] c, int m, int n, double init, Builtin builtin, int rl, int ru ) 
 	{
 		//init current row min/max array w/ extreme value 
-		double[] cmxx = new double[ n ]; 
-		Arrays.fill(cmxx, init);
+		double[] cmxx = (agg!=null) ? agg : new double[ n ]; 
+		if( agg == null )
+			Arrays.fill(cmxx, init);
 				
 		//init count arrays (helper, see correction)
 		int[] cnt = new int[ n ]; 
@@ -3434,6 +3633,45 @@ public class LibMatrixAgg
 			return _ret;
 		}
 	}
+
+	/**
+	 * 
+	 */
+	private static class CumAggTask implements Callable<Long> 
+	{
+		private MatrixBlock _in  = null;
+		private double[] _agg = null;
+		private MatrixBlock _ret = null;
+		private AggType _aggtype = null;
+		private UnaryOperator _uop = null;		
+		private int _rl = -1;
+		private int _ru = -1;
+
+		protected CumAggTask( MatrixBlock in, double[] agg, MatrixBlock ret, AggType aggtype, UnaryOperator uop, int rl, int ru ) 
+			throws DMLRuntimeException
+		{
+			_in = in;			
+			_agg = agg;
+			_ret = ret;
+			_aggtype = aggtype;
+			_uop = uop;
+			_rl = rl;
+			_ru = ru;
+		}
+		
+		@Override
+		public Long call() throws DMLRuntimeException
+		{
+			//compute partial cumulative aggregate
+			if( !_in.sparse )
+				cumaggregateUnaryMatrixDense(_in, _ret, _aggtype, _uop.fn, _agg, _rl, _ru);
+			else
+				cumaggregateUnaryMatrixSparse(_in, _ret, _aggtype, _uop.fn, _agg, _rl, _ru);
+			
+			//recompute partial non-zeros (ru exlusive)
+			return _ret.recomputeNonZeros(_rl, _ru-1, 0, _ret.getNumColumns()-1);
+		}		
+	}
 	
 	/**
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1e8de49c/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 6116fab..06ed711 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -2823,7 +2823,10 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 		if( LibMatrixAgg.isSupportedUnaryOperator(op) ) 
 		{
 			//e.g., cumsum/cumprod/cummin/cumax
-			LibMatrixAgg.aggregateUnaryMatrix(this, ret, op);
+			if( op.getNumThreads() > 1 )
+				LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, op, op.getNumThreads());
+			else
+				LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, op);
 		}
 		else
 		{

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1e8de49c/src/main/java/org/apache/sysml/runtime/matrix/operators/UnaryOperator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/operators/UnaryOperator.java b/src/main/java/org/apache/sysml/runtime/matrix/operators/UnaryOperator.java
index 1a45794..2f65fb4 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/operators/UnaryOperator.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/operators/UnaryOperator.java
@@ -25,14 +25,20 @@ import org.apache.sysml.runtime.functionobjects.ValueFunction;
 
 public class UnaryOperator extends Operator 
 {
-
 	private static final long serialVersionUID = 2441990876648978637L;
 
 	public ValueFunction fn;
-	public UnaryOperator(ValueFunction p)
+	private int k; //num threads
+
+	public UnaryOperator(ValueFunction p) {
+		this(p, 1); //default single-threaded
+	}
+	
+	public UnaryOperator(ValueFunction p, int numThreads)
 	{
 		fn = p;
 		sparseSafe = false;
+		k = numThreads;
 		
 		if(fn instanceof Builtin)
 		{
@@ -47,4 +53,8 @@ public class UnaryOperator extends Operator
 			}
 		}
 	}
+	
+	public int getNumThreads() {
+		return k;
+	}
 }