You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2015/12/23 12:03:24 UTC

incubator-systemml git commit: Multi-threaded grouped aggregate (col parallelization), incl refactoring

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 7290510ec -> 039d55c0a


Multi-threaded grouped aggregate (col parallelization), incl refactoring

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/039d55c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/039d55c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/039d55c0

Branch: refs/heads/master
Commit: 039d55c0a02c02e2b7f0df57991a5c985d95503b
Parents: 7290510
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Tue Dec 22 17:32:42 2015 +0100
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Tue Dec 22 17:32:42 2015 +0100

----------------------------------------------------------------------
 .../sysml/hops/ParameterizedBuiltinOp.java      |  93 +++---
 .../org/apache/sysml/lops/GroupedAggregate.java |  22 +-
 .../parfor/opt/OptimizerRuleBased.java          |   7 +-
 .../cp/ParameterizedBuiltinCPInstruction.java   |   3 +-
 .../sysml/runtime/matrix/data/LibMatrixAgg.java | 334 ++++++++++++++++++-
 .../sysml/runtime/matrix/data/MatrixBlock.java  | 241 ++-----------
 .../FullGroupedAggregateMatrixTest.java         |  10 +
 7 files changed, 443 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/039d55c0/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
index 51d4ba7..99eebaa 100644
--- a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
+++ b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
@@ -22,6 +22,7 @@ package org.apache.sysml.hops;
 import java.util.HashMap;
 import java.util.Map.Entry;
 
+import org.apache.sysml.hops.Hop.MultiThreadedHop;
 import org.apache.sysml.hops.rewrite.HopRewriteUtils;
 import org.apache.sysml.lops.Aggregate;
 import org.apache.sysml.lops.AppendR;
@@ -45,19 +46,19 @@ import org.apache.sysml.runtime.util.UtilFunctions;
  * Defines the HOP for calling an internal function (with custom parameters) from a DML script. 
  * 
  */
-public class ParameterizedBuiltinOp extends Hop 
-{
-	
+public class ParameterizedBuiltinOp extends Hop implements MultiThreadedHop
+{	
 	private static boolean COMPILE_PARALLEL_REMOVEEMPTY = true;
 	public static boolean FORCE_DIST_RM_EMPTY = false;
 
 	//operator type
 	private ParamBuiltinOp _op;
+
+	private int _maxNumThreads = -1; //-1 for unlimited
 	
 	//removeEmpty hints
 	private boolean _outputEmptyBlocks = true;
 	private boolean _outputPermutationMatrix = false;
-
 	private boolean _bRmEmptyBC = false;
 	
 	/**
@@ -130,6 +131,16 @@ public class ParameterizedBuiltinOp extends Hop
 	}
 	
 	@Override
+	public void setMaxNumThreads( int k ) {
+		_maxNumThreads = k;
+	}
+	
+	@Override
+	public int getMaxNumThreads() {
+		return _maxNumThreads;
+	}
+	
+	@Override
 	public Lop constructLops() 
 		throws HopsException, LopsException 
 	{		
@@ -214,11 +225,31 @@ public class ParameterizedBuiltinOp extends Hop
 		//reset reblock requirement (see MR aggregate / construct lops)
 		setRequiresReblock( false );
 		
+		//determine output dimensions
+		long outputDim1=-1, outputDim2=-1;
+		Lop numGroups = inputlops.get(Statement.GAGG_NUM_GROUPS);
+		if ( !dimsKnown() && numGroups != null && numGroups instanceof Data && ((Data)numGroups).isLiteral() ) {
+			long ngroups = ((Data)numGroups).getLongValue();
+			
+			Lop input = inputlops.get(GroupedAggregate.COMBINEDINPUT);
+			long inDim1 = input.getOutputParameters().getNumRows();
+			long inDim2 = input.getOutputParameters().getNumCols();
+			boolean rowwise = (inDim1==1 && inDim2 > 1 );
+			
+			if( rowwise ) { //vector
+				outputDim1 = ngroups;
+				outputDim2 = 1;
+			}
+			else { //vector or matrix
+				outputDim1 = inDim2;
+				outputDim2 = ngroups;
+			}			
+		}
+		
+		//construct lops
 		if ( et == ExecType.MR ) 
 		{
-			// construct necessary lops: combineBinary/combineTertiary and
-			// groupedAgg
-
+			// construct necessary lops: combineBinary/combineTertiary and groupedAgg
 			boolean isWeighted = (_paramIndexMap.get(Statement.GAGG_WEIGHTS) != null);
 			if (isWeighted) 
 			{
@@ -284,27 +315,6 @@ public class ParameterizedBuiltinOp extends Hop
 				inputlops.remove(Statement.GAGG_GROUPS);
 			}
 			
-			long outputDim1=-1, outputDim2=-1;
-			Lop numGroups = inputlops.get(Statement.GAGG_NUM_GROUPS);
-			if ( !dimsKnown() && numGroups != null && numGroups instanceof Data && ((Data)numGroups).isLiteral() ) {
-				long ngroups = ((Data)numGroups).getLongValue();
-				
-				Lop input = inputlops.get(GroupedAggregate.COMBINEDINPUT);
-				long inDim1 = input.getOutputParameters().getNumRows();
-				long inDim2 = input.getOutputParameters().getNumCols();
-				boolean rowwise = (inDim1==1 && inDim2 > 1 );
-				
-				if( rowwise ) { //vector
-					outputDim1 = ngroups;
-					outputDim2 = 1;
-				}
-				else { //vector or matrix
-					outputDim1 = inDim2;
-					outputDim2 = ngroups;
-				}
-				
-			}
-			
 			GroupedAggregate grp_agg = new GroupedAggregate(inputlops, isWeighted, getDataType(), getValueType());
 			grp_agg.getOutputParameters().setDimensions(outputDim1, outputDim2, getRowsInBlock(), getColsInBlock(), -1);
 			setLineNumbers(grp_agg);
@@ -312,26 +322,25 @@ public class ParameterizedBuiltinOp extends Hop
 			setLops(grp_agg);
 			setRequiresReblock( true );
 		}
-		else //CP 
+		else //CP/Spark 
 		{
-			GroupedAggregate grp_agg = new GroupedAggregate(inputlops,
-					getDataType(), getValueType(), et);
-			// output dimensions are unknown at compilation time
-			grp_agg.getOutputParameters().setDimensions(-1, -1, -1, -1, -1);
-			grp_agg.setAllPositions(this.getBeginLine(), this.getBeginColumn(), this.getEndLine(), this.getEndColumn());
+			GroupedAggregate grp_agg = null;
 			
-			// introduce a reblock lop only if it is NOT single_node execution
-			if( et == ExecType.CP){
-				//force blocked output in CP (see below)
-				grp_agg.getOutputParameters().setDimensions(-1, 1, getRowsInBlock(), getColsInBlock(), -1);
+			if( et == ExecType.CP) 
+			{
+				int k = OptimizerUtils.getConstrainedNumThreads( _maxNumThreads );
+				grp_agg = new GroupedAggregate(inputlops, getDataType(), getValueType(), et, k);						
+				grp_agg.getOutputParameters().setDimensions(outputDim1, outputDim2, getRowsInBlock(), getColsInBlock(), -1);
 			}
-			else if(et == ExecType.SPARK) {
-				grp_agg.getOutputParameters().setDimensions(-1, 1, -1, -1, -1);
+			else if(et == ExecType.SPARK) 
+			{
+				grp_agg = new GroupedAggregate(inputlops, getDataType(), getValueType(), et);						
+				grp_agg.getOutputParameters().setDimensions(outputDim1, outputDim2, -1, -1, -1);
 				setRequiresReblock( true );
 			}
-			//grouped agg, w/o reblock in CP
-			setLops(grp_agg);
 			
+			setLineNumbers(grp_agg);
+			setLops(grp_agg);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/039d55c0/src/main/java/org/apache/sysml/lops/GroupedAggregate.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/GroupedAggregate.java b/src/main/java/org/apache/sysml/lops/GroupedAggregate.java
index 9498e94..f54857d 100644
--- a/src/main/java/org/apache/sysml/lops/GroupedAggregate.java
+++ b/src/main/java/org/apache/sysml/lops/GroupedAggregate.java
@@ -39,8 +39,9 @@ public class GroupedAggregate extends Lop
 	private static final String opcode = "groupedagg";
 	public static final String COMBINEDINPUT = "combinedinput";
 	
-	private boolean _weights = false;
-	
+	private boolean _weights = false;	
+	private int _numThreads = 1;
+
 	/**
 	 * Constructor to perform grouped aggregate.
 	 * inputParameterLops <- parameters required to compute different aggregates (hashmap)
@@ -62,6 +63,14 @@ public class GroupedAggregate extends Lop
 		init(inputParameterLops, dt, vt, et);
 	}
 	
+	public GroupedAggregate(
+			HashMap<String, Lop> inputParameterLops, 
+			DataType dt, ValueType vt, ExecType et, int k) {
+		super(Lop.Type.GroupedAgg, dt, vt);
+		init(inputParameterLops, dt, vt, et);
+		_numThreads = k;
+	}
+	
 	/**
 	 * 
 	 * @param inputParameterLops
@@ -188,8 +197,15 @@ public class GroupedAggregate extends Lop
 			}
 		}
 		
+		if( getExecType()==ExecType.CP ) {
+			sb.append( OPERAND_DELIMITOR );
+			sb.append( "k" );
+			sb.append( Lop.NAME_VALUE_SEPARATOR );
+			sb.append( _numThreads );	
+		}
+		
 		sb.append( OPERAND_DELIMITOR );
-		sb.append( this.prepOutputOperand(output));
+		sb.append( prepOutputOperand(output));
 		
 		return sb.toString();
 	}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/039d55c0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index 9536db1..4eee876 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -29,7 +29,6 @@ import java.util.Set;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.conf.DMLConfig;
 import org.apache.sysml.hops.AggBinaryOp;
@@ -38,12 +37,14 @@ import org.apache.sysml.hops.FunctionOp;
 import org.apache.sysml.hops.Hop;
 import org.apache.sysml.hops.AggBinaryOp.MMultMethod;
 import org.apache.sysml.hops.Hop.MultiThreadedHop;
+import org.apache.sysml.hops.Hop.ParamBuiltinOp;
 import org.apache.sysml.hops.Hop.ReOrgOp;
 import org.apache.sysml.hops.HopsException;
 import org.apache.sysml.hops.IndexingOp;
 import org.apache.sysml.hops.LeftIndexingOp;
 import org.apache.sysml.hops.LiteralOp;
 import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.hops.ParameterizedBuiltinOp;
 import org.apache.sysml.hops.ReorgOp;
 import org.apache.sysml.hops.rewrite.HopRewriteUtils;
 import org.apache.sysml.hops.rewrite.ProgramRewriteStatus;
@@ -1523,7 +1524,9 @@ public class OptimizerRuleBased extends Optimizer
 					//set degree of parallelism for multi-threaded leaf nodes
 					Hop h = OptTreeConverter.getAbstractPlanMapping().getMappedHop(c.getID());
 					if(    OptimizerUtils.PARALLEL_CP_MATRIX_MULTIPLY 
-						&& h instanceof MultiThreadedHop ) //abop, datagenop, qop
+						&& h instanceof MultiThreadedHop //abop, datagenop, qop, paramop
+						&& !( h instanceof ParameterizedBuiltinOp //only paramop-grpagg
+							 && ((ParameterizedBuiltinOp)h).getOp()!=ParamBuiltinOp.GROUPEDAGG) )
 					{
 						MultiThreadedHop mhop = (MultiThreadedHop) h;
 						mhop.setMaxNumThreads(opsK); //set max constraint in hop

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/039d55c0/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
index f2705cc..bd3153a 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
@@ -165,7 +165,8 @@ public class ParameterizedBuiltinCPInstruction extends ComputationCPInstruction
 			}
 			
 			// compute the result
-			MatrixBlock soresBlock = (MatrixBlock) (groups.groupedAggOperations(target, weights, new MatrixBlock(), ngroups, _optr));
+			int k = Integer.parseInt(params.get("k")); //num threads
+			MatrixBlock soresBlock = (MatrixBlock) (groups.groupedAggOperations(target, weights, new MatrixBlock(), ngroups, _optr, k));
 			
 			ec.setMatrixOutput(output.getName(), soresBlock);
 			// release locks

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/039d55c0/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
index 21a5a6b..1373df3 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
@@ -30,6 +30,7 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.DMLUnsupportedOperationException;
 import org.apache.sysml.runtime.functionobjects.Builtin;
 import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinFunctionCode;
+import org.apache.sysml.runtime.functionobjects.CM;
 import org.apache.sysml.runtime.functionobjects.IndexFunction;
 import org.apache.sysml.runtime.functionobjects.KahanFunction;
 import org.apache.sysml.runtime.functionobjects.KahanPlus;
@@ -41,9 +42,12 @@ import org.apache.sysml.runtime.functionobjects.ReduceCol;
 import org.apache.sysml.runtime.functionobjects.ReduceDiag;
 import org.apache.sysml.runtime.functionobjects.ReduceRow;
 import org.apache.sysml.runtime.functionobjects.ValueFunction;
+import org.apache.sysml.runtime.instructions.cp.CM_COV_Object;
 import org.apache.sysml.runtime.instructions.cp.KahanObject;
 import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
 import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
+import org.apache.sysml.runtime.matrix.operators.CMOperator;
+import org.apache.sysml.runtime.matrix.operators.Operator;
 import org.apache.sysml.runtime.matrix.operators.UnaryOperator;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
@@ -385,9 +389,89 @@ public class LibMatrixAgg
 		
 		return val;			
 	}
+
+	/**
+	 * 
+	 * @param groups
+	 * @param target
+	 * @param weights
+	 * @param result
+	 * @param numGroups
+	 * @param op
+	 * @throws DMLRuntimeException
+	 */
+	public static void groupedAggregate(MatrixBlock groups, MatrixBlock target, MatrixBlock weights, MatrixBlock result, int numGroups, Operator op) 
+		throws DMLRuntimeException
+	{
+		if( !(op instanceof CMOperator || op instanceof AggregateOperator) ) {
+			throw new DMLRuntimeException("Invalid operator (" + op + ") encountered while processing groupedAggregate.");
+		}
+		
+		//CM operator for count, mean, variance
+		//note: current support only for column vectors
+		if(op instanceof CMOperator) {
+			CMOperator cmOp = (CMOperator) op;
+			groupedAggregateCM(groups, target, weights, result, numGroups, cmOp, 0, target.clen);
+		}
+		//Aggregate operator for sum (via kahan sum)
+		//note: support for row/column vectors and dense/sparse
+		else if( op instanceof AggregateOperator ) {
+			AggregateOperator aggop = (AggregateOperator) op;
+			groupedAggregateKahanPlus(groups, target, weights, result, numGroups, aggop, 0, target.clen);
+		}		
+	}
 	
 	/**
 	 * 
+	 * @param groups
+	 * @param target
+	 * @param weights
+	 * @param result
+	 * @param numGroups
+	 * @param op
+	 * @param k
+	 * @throws DMLRuntimeException
+	 */
+	public static void groupedAggregate(MatrixBlock groups, MatrixBlock target, MatrixBlock weights, MatrixBlock result, int numGroups, Operator op, int k) 
+		throws DMLRuntimeException
+	{
+		//fall back to sequential version if necessary
+		boolean rowVector = (target.getNumRows()==1 && target.getNumColumns()>1);
+		if( k <= 1 || (long)target.rlen*target.clen < PAR_NUMCELL_THRESHOLD || rowVector ) {
+			groupedAggregate(groups, target, weights, result, numGroups, op);
+			return;
+		}
+		
+		if( !(op instanceof CMOperator || op instanceof AggregateOperator) ) {
+			throw new DMLRuntimeException("Invalid operator (" + op + ") encountered while processing groupedAggregate.");
+		}
+		
+		//preprocessing
+		result.sparse = false;
+		result.allocateDenseBlock();
+		
+		//core multi-threaded grouped aggregate computation
+		//(currently: parallelization over columns to avoid additional memory requirements)
+		try {
+			ExecutorService pool = Executors.newFixedThreadPool( k );
+			ArrayList<GrpAggTask> tasks = new ArrayList<GrpAggTask>();
+			int blklen = (int)(Math.ceil((double)target.clen/k));
+			for( int i=0; i<k & i*blklen<target.clen; i++ )
+				tasks.add( new GrpAggTask(groups, target, weights, result, numGroups, op, i*blklen, Math.min((i+1)*blklen, target.clen)) );
+			pool.invokeAll(tasks);	
+			pool.shutdown();
+		}
+		catch(Exception ex) {
+			throw new DMLRuntimeException(ex);
+		}
+		
+		//postprocessing
+		result.recomputeNonZeros();
+		result.examSparsity();
+	}
+		
+	/**
+	 * 
 	 * @param op
 	 * @return
 	 */
@@ -619,6 +703,211 @@ public class LibMatrixAgg
 		return kbuff._sum;
 	}
 	
+
+	/**
+	 * This is a specific implementation for aggregate(fn="sum"), where we use KahanPlus for numerical
+	 * stability. In contrast to other functions of aggregate, this implementation supports row and column
+	 * vectors for target and exploits sparse representations since KahanPlus is sparse-safe.
+	 * 
+	 * @param target
+	 * @param weights
+	 * @param op
+	 * @throws DMLRuntimeException 
+	 */
+	private static void groupedAggregateKahanPlus( MatrixBlock groups, MatrixBlock target, MatrixBlock weights, MatrixBlock result, int numGroups, AggregateOperator aggop, int cl, int cu ) 
+		throws DMLRuntimeException
+	{
+		boolean rowVector = (target.getNumRows()==1 && target.getNumColumns()>1);
+		int numCols = (!rowVector) ? target.getNumColumns() : 1;
+		double w = 1; //default weight
+		
+		//skip empty blocks (sparse-safe operation)
+		if( target.isEmptyBlock(false) ) 
+			return;
+		
+		//init group buffers
+		int numCols2 = cu-cl;
+		KahanObject[][] buffer = new KahanObject[numGroups][numCols2];
+		for( int i=0; i<numGroups; i++ )
+			for( int j=0; j<numCols2; j++ )
+				buffer[i][j] = new KahanObject(aggop.initialValue, 0);
+			
+		if( rowVector ) //target is rowvector
+		{	
+			//note: always sequential, no need to respect cl/cu 
+			
+			if( target.sparse ) //SPARSE target
+			{
+				if( target.sparseRows[0]!=null )
+				{
+					int len = target.sparseRows[0].size();
+					int[] aix = target.sparseRows[0].getIndexContainer();
+					double[] avals = target.sparseRows[0].getValueContainer();	
+					for( int j=0; j<len; j++ ) //for each nnz
+					{
+						int g = (int) groups.quickGetValue(aix[j], 0);		
+						if ( g > numGroups )
+							continue;
+						if ( weights != null )
+							w = weights.quickGetValue(aix[j],0);
+						aggop.increOp.fn.execute(buffer[g-1][0], avals[j]*w);						
+					}
+				}
+					
+			}
+			else //DENSE target
+			{
+				for ( int i=0; i < target.getNumColumns(); i++ ) {
+					double d = target.denseBlock[ i ];
+					if( d != 0 ) //sparse-safe
+					{
+						int g = (int) groups.quickGetValue(i, 0);		
+						if ( g > numGroups )
+							continue;
+						if ( weights != null )
+							w = weights.quickGetValue(i,0);
+						// buffer is 0-indexed, whereas range of values for g = [1,numGroups]
+						aggop.increOp.fn.execute(buffer[g-1][0], d*w);
+					}
+				}
+			}
+		}
+		else //column vector or matrix 
+		{
+			if( target.sparse ) //SPARSE target
+			{
+				SparseRow[] a = target.sparseRows;
+				
+				for( int i=0; i < groups.getNumRows(); i++ ) 
+				{
+					int g = (int) groups.quickGetValue(i, 0);		
+					if ( g > numGroups )
+						continue;
+					
+					if( a[i] != null && !a[i].isEmpty() )
+					{
+						int len = a[i].size();
+						int[] aix = a[i].getIndexContainer();
+						double[] avals = a[i].getValueContainer();	
+						int j = (cl==0) ? 0 : a[i].searchIndexesFirstGTE(cl);
+						j = (j>=0) ? j : len;
+						
+						for( ; j<len && aix[j]<cu; j++ ) //for each nnz
+						{
+							if ( weights != null )
+								w = weights.quickGetValue(aix[j],0);
+							aggop.increOp.fn.execute(buffer[g-1][aix[j]-cl], avals[j]*w);						
+						}
+					}
+				}
+			}
+			else //DENSE target
+			{
+				double[] a = target.denseBlock;
+				
+				for( int i=0, aix=0; i < groups.getNumRows(); i++, aix+=numCols ) 
+				{
+					int g = (int) groups.quickGetValue(i, 0);		
+					if ( g > numGroups )
+						continue;
+				
+					for( int j=cl; j < cu; j++ ) {
+						double d = a[ aix+j ];
+						if( d != 0 ) { //sparse-safe
+							if ( weights != null )
+								w = weights.quickGetValue(i,0);
+							// buffer is 0-indexed, whereas range of values for g = [1,numGroups]
+							aggop.increOp.fn.execute(buffer[g-1][j-cl], d*w);
+						}
+					}
+				}
+			}
+		}
+		
+		// extract the results from group buffers
+		for( int i=0; i < numGroups; i++ )
+			for( int j=0; j < numCols2; j++ )
+				result.appendValue(i, j+cl, buffer[i][j]._sum);
+	}
+
+	/**
+	 * 
+	 * @param target
+	 * @param weights
+	 * @param result
+	 * @param cmOp
+	 * @throws DMLRuntimeException
+	 */
+	private static void groupedAggregateCM( MatrixBlock groups, MatrixBlock target, MatrixBlock weights, MatrixBlock result, int numGroups, CMOperator cmOp, int cl, int cu ) 
+		throws DMLRuntimeException
+	{
+		CM cmFn = CM.getCMFnObject(((CMOperator) cmOp).getAggOpType());
+		double w = 1; //default weight
+		
+		//init group buffers
+		int numCols2 = cu-cl;
+		CM_COV_Object[][] cmValues = new CM_COV_Object[numGroups][numCols2];
+		for ( int i=0; i < numGroups; i++ )
+			for( int j=0; j < numCols2; j++  )
+				cmValues[i][j] = new CM_COV_Object();
+		
+		//column vector or matrix
+		if( target.sparse ) //SPARSE target
+		{
+			SparseRow[] a = target.sparseRows;
+			
+			for( int i=0; i < groups.getNumRows(); i++ ) 
+			{
+				int g = (int) groups.quickGetValue(i, 0);		
+				if ( g > numGroups )
+					continue;
+				
+				if( a[i] != null && !a[i].isEmpty() )
+				{
+					int len = a[i].size();
+					int[] aix = a[i].getIndexContainer();
+					double[] avals = a[i].getValueContainer();	
+					int j = (cl==0) ? 0 : a[i].searchIndexesFirstGTE(cl);
+					j = (j>=0) ? j : len;
+					
+					for( ; j<len && aix[j]<cu; j++ ) //for each nnz
+					{
+						if ( weights != null )
+							w = weights.quickGetValue(i, 0);
+						cmFn.execute(cmValues[g-1][aix[j]-cl], avals[j], w);						
+					}
+					//TODO sparse unsafe correction
+				}
+			}
+		}
+		else //DENSE target
+		{
+			double[] a = target.denseBlock;
+			
+			for( int i=0, aix=0; i < groups.getNumRows(); i++, aix+=target.clen ) 
+			{
+				int g = (int) groups.quickGetValue(i, 0);		
+				if ( g > numGroups )
+					continue;
+			
+				for( int j=cl; j<cu; j++ ) {
+					double d = a[ aix+j ]; //sparse unsafe
+					if ( weights != null )
+						w = weights.quickGetValue(i,0);
+					// buffer is 0-indexed, whereas range of values for g = [1,numGroups]
+					cmFn.execute(cmValues[g-1][j-cl], d, w);
+				}
+			}
+		}
+		
+		// extract the required value from each CM_COV_Object
+		for( int i=0; i < numGroups; i++ ) 
+			for( int j=0; j < numCols2; j++ ) {
+				// result is 0-indexed, so is cmValues
+				result.appendValue(i, j, cmValues[i][j+cl].getRequiredResult(cmOp));
+			}			
+	}
+	
 	/**
 	 * 
 	 * @param in
@@ -2861,5 +3150,48 @@ public class LibMatrixAgg
 			return _ret;
 		}
 	}
-}
+	
+	private static class GrpAggTask extends AggTask 
+	{
+		private MatrixBlock _groups  = null;
+		private MatrixBlock _target  = null;
+		private MatrixBlock _weights  = null;
+		private MatrixBlock _ret  = null;
+		private int _numGroups = -1;
+		private Operator _op = null;
+		private int _cl = -1;
+		private int _cu = -1;
 
+		protected GrpAggTask( MatrixBlock groups, MatrixBlock target, MatrixBlock weights, MatrixBlock ret, int numGroups, Operator op, int cl, int cu ) 
+			throws DMLRuntimeException
+		{
+			_groups = groups;
+			_target = target;
+			_weights = weights;
+			_ret = ret;
+			_numGroups = numGroups;
+			_op = op;
+			_cl = cl;
+			_cu = cu;
+		}
+		
+		@Override
+		public Object call() throws DMLRuntimeException
+		{
+			//CM operator for count, mean, variance
+			//note: current support only for column vectors
+			if( _op instanceof CMOperator ) {
+				CMOperator cmOp = (CMOperator) _op;
+				groupedAggregateCM(_groups, _target, _weights, _ret, _numGroups, cmOp, _cl, _cu);
+			}
+			//Aggregate operator for sum (via kahan sum)
+			//note: support for row/column vectors and dense/sparse
+			else if( _op instanceof AggregateOperator ) {
+				AggregateOperator aggop = (AggregateOperator) _op;
+				groupedAggregateKahanPlus(_groups, _target, _weights, _ret, _numGroups, aggop, _cl, _cu);
+			}
+			
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/039d55c0/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index ca569f3..c620908 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -47,7 +47,6 @@ import org.apache.sysml.parser.DMLTranslator;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.DMLUnsupportedOperationException;
 import org.apache.sysml.runtime.functionobjects.Builtin;
-import org.apache.sysml.runtime.functionobjects.CM;
 import org.apache.sysml.runtime.functionobjects.CTable;
 import org.apache.sysml.runtime.functionobjects.DiagIndex;
 import org.apache.sysml.runtime.functionobjects.Divide;
@@ -5289,6 +5288,25 @@ public class MatrixBlock extends MatrixValue implements Externalizable
 	public MatrixValue groupedAggOperations(MatrixValue tgt, MatrixValue wghts, MatrixValue ret, int ngroups, Operator op) 
 		throws DMLRuntimeException, DMLUnsupportedOperationException 
 	{
+		//single-threaded grouped aggregate 
+		return groupedAggOperations(tgt, wghts, ret, ngroups, op, 1);
+	}
+	
+	/**
+	 * 
+	 * @param tgt
+	 * @param wghts
+	 * @param ret
+	 * @param ngroups
+	 * @param op
+	 * @param k
+	 * @return
+	 * @throws DMLRuntimeException
+	 * @throws DMLUnsupportedOperationException
+	 */
+	public MatrixValue groupedAggOperations(MatrixValue tgt, MatrixValue wghts, MatrixValue ret, int ngroups, Operator op, int k) 
+		throws DMLRuntimeException, DMLUnsupportedOperationException 		
+	{
 		//setup input matrices
 		// this <- groups
 		MatrixBlock target = checkType(tgt);
@@ -5332,228 +5350,15 @@ public class MatrixBlock extends MatrixValue implements Externalizable
 		else
 			result.reset(numGroups, rowVector?1:target.getNumColumns(), result_sparsity);
 
-		//CM operator for count, mean, variance
-		//note: current support only for column vectors
-		if(op instanceof CMOperator) 
-		{
-			// initialize required objects for storing the result of CM operations
-			CMOperator cmOp = (CMOperator) op;
-			
-			groupedAggregateCM(target, weights, result, cmOp);
-		}
-		//Aggregate operator for sum (via kahan sum)
-		//note: support for row/column vectors and dense/sparse
-		else if( op instanceof AggregateOperator ) 
-		{
-			//the only aggregate operator that is supported here is sum,
-			//furthermore, we always use KahanPlus and hence aggop.correctionExists is true
-			
-			AggregateOperator aggop = (AggregateOperator) op;
-				
-			//default case for aggregate(sum)
-			groupedAggregateKahanPlus(target, weights, result, aggop);
-		}
+		//execute grouped aggregate operation
+		if( k > 1 )
+			LibMatrixAgg.groupedAggregate(this, target, weights, result, numGroups, op, k);
 		else
-			throw new DMLRuntimeException("Invalid operator (" + op + ") encountered while processing groupedAggregate.");
+			LibMatrixAgg.groupedAggregate(this, target, weights, result, numGroups, op);
 		
 		return result;
 	}
 	
-
-	/**
-	 * This is a specific implementation for aggregate(fn="sum"), where we use KahanPlus for numerical
-	 * stability. In contrast to other functions of aggregate, this implementation supports row and column
-	 * vectors for target and exploits sparse representations since KahanPlus is sparse-safe.
-	 * 
-	 * @param target
-	 * @param weights
-	 * @param op
-	 * @throws DMLRuntimeException 
-	 */
-	private void groupedAggregateKahanPlus( MatrixBlock target, MatrixBlock weights, MatrixBlock result, AggregateOperator aggop ) 
-		throws DMLRuntimeException
-	{
-		boolean rowVector = (target.getNumRows()==1 && target.getNumColumns()>1);
-		int numCols = (!rowVector) ? target.getNumColumns() : 1;
-		double w = 1; //default weight
-		
-		//skip empty blocks (sparse-safe operation)
-		if( target.isEmptyBlock(false) ) 
-			return;
-		
-		//init group buffers
-		KahanObject[][] buffer = new KahanObject[numGroups][numCols];
-		for( int i=0; i<numGroups; i++ )
-			for( int j=0; j<numCols; j++ )
-				buffer[i][j] = new KahanObject(aggop.initialValue, 0);
-			
-		if( rowVector ) //target is rowvector
-		{	
-			if( target.sparse ) //SPARSE target
-			{
-				if( target.sparseRows[0]!=null )
-				{
-					int len = target.sparseRows[0].size();
-					int[] aix = target.sparseRows[0].getIndexContainer();
-					double[] avals = target.sparseRows[0].getValueContainer();	
-					for( int j=0; j<len; j++ ) //for each nnz
-					{
-						int g = (int) this.quickGetValue(aix[j], 0);		
-						if ( g > numGroups )
-							continue;
-						if ( weights != null )
-							w = weights.quickGetValue(aix[j],0);
-						aggop.increOp.fn.execute(buffer[g-1][0], avals[j]*w);						
-					}
-				}
-					
-			}
-			else //DENSE target
-			{
-				for ( int i=0; i < target.getNumColumns(); i++ ) {
-					double d = target.denseBlock[ i ];
-					if( d != 0 ) //sparse-safe
-					{
-						int g = (int) this.quickGetValue(i, 0);		
-						if ( g > numGroups )
-							continue;
-						if ( weights != null )
-							w = weights.quickGetValue(i,0);
-						// buffer is 0-indexed, whereas range of values for g = [1,numGroups]
-						aggop.increOp.fn.execute(buffer[g-1][0], d*w);
-					}
-				}
-			}
-		}
-		else //column vector or matrix 
-		{
-			if( target.sparse ) //SPARSE target
-			{
-				SparseRow[] a = target.sparseRows;
-				
-				for( int i=0; i < getNumRows(); i++ ) 
-				{
-					int g = (int) this.quickGetValue(i, 0);		
-					if ( g > numGroups )
-						continue;
-					
-					if( a[i] != null && !a[i].isEmpty() )
-					{
-						int len = a[i].size();
-						int[] aix = a[i].getIndexContainer();
-						double[] avals = a[i].getValueContainer();	
-						for( int j=0; j<len; j++ ) //for each nnz
-						{
-							if ( weights != null )
-								w = weights.quickGetValue(aix[j],0);
-							aggop.increOp.fn.execute(buffer[g-1][aix[j]], avals[j]*w);						
-						}
-					}
-				}
-			}
-			else //DENSE target
-			{
-				double[] a = target.denseBlock;
-				
-				for( int i=0, aix=0; i < getNumRows(); i++, aix+=numCols ) 
-				{
-					int g = (int) this.quickGetValue(i, 0);		
-					if ( g > numGroups )
-						continue;
-				
-					for( int j=0; j < numCols; j++ ) {
-						double d = a[ aix+j ];
-						if( d != 0 ) { //sparse-safe
-							if ( weights != null )
-								w = weights.quickGetValue(i,0);
-							// buffer is 0-indexed, whereas range of values for g = [1,numGroups]
-							aggop.increOp.fn.execute(buffer[g-1][j], d*w);
-						}
-					}
-				}
-			}
-		}
-		
-		// extract the results from group buffers
-		for( int i=0; i < numGroups; i++ )
-			for( int j=0; j < numCols; j++ )
-				result.appendValue(i, j, buffer[i][j]._sum);
-	}
-
-	/**
-	 * 
-	 * @param target
-	 * @param weights
-	 * @param result
-	 * @param cmOp
-	 * @throws DMLRuntimeException
-	 */
-	private void groupedAggregateCM( MatrixBlock target, MatrixBlock weights, MatrixBlock result, CMOperator cmOp ) 
-		throws DMLRuntimeException
-	{
-		CM cmFn = CM.getCMFnObject(((CMOperator) cmOp).getAggOpType());
-		double w = 1; //default weight
-		
-		//init group buffers
-		CM_COV_Object[][] cmValues = new CM_COV_Object[numGroups][target.clen];
-		for ( int i=0; i < numGroups; i++ )
-			for( int j=0; j < target.clen; j++  )
-				cmValues[i][j] = new CM_COV_Object();
-		
-		
-		//column vector or matrix
-		if( target.sparse ) //SPARSE target
-		{
-			SparseRow[] a = target.sparseRows;
-			
-			for( int i=0; i < getNumRows(); i++ ) 
-			{
-				int g = (int) this.quickGetValue(i, 0);		
-				if ( g > numGroups )
-					continue;
-				
-				if( a[i] != null && !a[i].isEmpty() )
-				{
-					int len = a[i].size();
-					int[] aix = a[i].getIndexContainer();
-					double[] avals = a[i].getValueContainer();	
-					for( int j=0; j<len; j++ ) //for each nnz
-					{
-						if ( weights != null )
-							w = weights.quickGetValue(aix[j],0);
-						cmFn.execute(cmValues[g-1][aix[j]], avals[j], w);						
-					}
-					//TODO sparse unsafe correction
-				}
-			}
-		}
-		else //DENSE target
-		{
-			double[] a = target.denseBlock;
-			
-			for( int i=0, aix=0; i < getNumRows(); i++, aix+=target.clen ) 
-			{
-				int g = (int) this.quickGetValue(i, 0);		
-				if ( g > numGroups )
-					continue;
-			
-				for( int j=0; j < target.clen; j++ ) {
-					double d = a[ aix+j ]; //sparse unsafe
-					if ( weights != null )
-						w = weights.quickGetValue(i,0);
-					// buffer is 0-indexed, whereas range of values for g = [1,numGroups]
-					cmFn.execute(cmValues[g-1][j], d, w);
-				}
-			}
-		}
-		
-		// extract the required value from each CM_COV_Object
-		for( int i=0; i < numGroups; i++ ) 
-			for( int j=0; j < target.clen; j++ ) {
-				// result is 0-indexed, so is cmValues
-				result.appendValue(i, j, cmValues[i][j].getRequiredResult(cmOp));
-			}			
-	}
 	
 	/**
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/039d55c0/src/test/java/org/apache/sysml/test/integration/functions/aggregate/FullGroupedAggregateMatrixTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/aggregate/FullGroupedAggregateMatrixTest.java b/src/test/java/org/apache/sysml/test/integration/functions/aggregate/FullGroupedAggregateMatrixTest.java
index 6a165f2..f3e4201 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/aggregate/FullGroupedAggregateMatrixTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/aggregate/FullGroupedAggregateMatrixTest.java
@@ -157,6 +157,16 @@ public class FullGroupedAggregateMatrixTest extends AutomatedTestBase
 		runGroupedAggregateOperationTest(TEST_NAME1, OpType.MOMENT4, true, ExecType.CP);
 	}
 
+	@Test
+	public void testGroupedAggSumDenseWideCP() {
+		runGroupedAggregateOperationTest(TEST_NAME1, OpType.SUM, false, ExecType.CP, cols2);
+	}
+	
+	@Test
+	public void testGroupedAggSumSparseWideCP() {
+		runGroupedAggregateOperationTest(TEST_NAME1, OpType.SUM, true, ExecType.CP, cols2);
+	}
+	
 	//special CP testcases (negative)
 	
 	@Test