You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2015/12/23 12:03:24 UTC
incubator-systemml git commit: Multi-threaded grouped aggregate (col
parallelization), incl refactoring
Repository: incubator-systemml
Updated Branches:
refs/heads/master 7290510ec -> 039d55c0a
Multi-threaded grouped aggregate (col parallelization), incl refactoring
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/039d55c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/039d55c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/039d55c0
Branch: refs/heads/master
Commit: 039d55c0a02c02e2b7f0df57991a5c985d95503b
Parents: 7290510
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Tue Dec 22 17:32:42 2015 +0100
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Tue Dec 22 17:32:42 2015 +0100
----------------------------------------------------------------------
.../sysml/hops/ParameterizedBuiltinOp.java | 93 +++---
.../org/apache/sysml/lops/GroupedAggregate.java | 22 +-
.../parfor/opt/OptimizerRuleBased.java | 7 +-
.../cp/ParameterizedBuiltinCPInstruction.java | 3 +-
.../sysml/runtime/matrix/data/LibMatrixAgg.java | 334 ++++++++++++++++++-
.../sysml/runtime/matrix/data/MatrixBlock.java | 241 ++-----------
.../FullGroupedAggregateMatrixTest.java | 10 +
7 files changed, 443 insertions(+), 267 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/039d55c0/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
index 51d4ba7..99eebaa 100644
--- a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
+++ b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
@@ -22,6 +22,7 @@ package org.apache.sysml.hops;
import java.util.HashMap;
import java.util.Map.Entry;
+import org.apache.sysml.hops.Hop.MultiThreadedHop;
import org.apache.sysml.hops.rewrite.HopRewriteUtils;
import org.apache.sysml.lops.Aggregate;
import org.apache.sysml.lops.AppendR;
@@ -45,19 +46,19 @@ import org.apache.sysml.runtime.util.UtilFunctions;
* Defines the HOP for calling an internal function (with custom parameters) from a DML script.
*
*/
-public class ParameterizedBuiltinOp extends Hop
-{
-
+public class ParameterizedBuiltinOp extends Hop implements MultiThreadedHop
+{
private static boolean COMPILE_PARALLEL_REMOVEEMPTY = true;
public static boolean FORCE_DIST_RM_EMPTY = false;
//operator type
private ParamBuiltinOp _op;
+
+ private int _maxNumThreads = -1; //-1 for unlimited
//removeEmpty hints
private boolean _outputEmptyBlocks = true;
private boolean _outputPermutationMatrix = false;
-
private boolean _bRmEmptyBC = false;
/**
@@ -130,6 +131,16 @@ public class ParameterizedBuiltinOp extends Hop
}
@Override
+ public void setMaxNumThreads( int k ) {
+ _maxNumThreads = k;
+ }
+
+ @Override
+ public int getMaxNumThreads() {
+ return _maxNumThreads;
+ }
+
+ @Override
public Lop constructLops()
throws HopsException, LopsException
{
@@ -214,11 +225,31 @@ public class ParameterizedBuiltinOp extends Hop
//reset reblock requirement (see MR aggregate / construct lops)
setRequiresReblock( false );
+ //determine output dimensions
+ long outputDim1=-1, outputDim2=-1;
+ Lop numGroups = inputlops.get(Statement.GAGG_NUM_GROUPS);
+ if ( !dimsKnown() && numGroups != null && numGroups instanceof Data && ((Data)numGroups).isLiteral() ) {
+ long ngroups = ((Data)numGroups).getLongValue();
+
+ Lop input = inputlops.get(GroupedAggregate.COMBINEDINPUT);
+ long inDim1 = input.getOutputParameters().getNumRows();
+ long inDim2 = input.getOutputParameters().getNumCols();
+ boolean rowwise = (inDim1==1 && inDim2 > 1 );
+
+ if( rowwise ) { //vector
+ outputDim1 = ngroups;
+ outputDim2 = 1;
+ }
+ else { //vector or matrix
+ outputDim1 = inDim2;
+ outputDim2 = ngroups;
+ }
+ }
+
+ //construct lops
if ( et == ExecType.MR )
{
- // construct necessary lops: combineBinary/combineTertiary and
- // groupedAgg
-
+ // construct necessary lops: combineBinary/combineTertiary and groupedAgg
boolean isWeighted = (_paramIndexMap.get(Statement.GAGG_WEIGHTS) != null);
if (isWeighted)
{
@@ -284,27 +315,6 @@ public class ParameterizedBuiltinOp extends Hop
inputlops.remove(Statement.GAGG_GROUPS);
}
- long outputDim1=-1, outputDim2=-1;
- Lop numGroups = inputlops.get(Statement.GAGG_NUM_GROUPS);
- if ( !dimsKnown() && numGroups != null && numGroups instanceof Data && ((Data)numGroups).isLiteral() ) {
- long ngroups = ((Data)numGroups).getLongValue();
-
- Lop input = inputlops.get(GroupedAggregate.COMBINEDINPUT);
- long inDim1 = input.getOutputParameters().getNumRows();
- long inDim2 = input.getOutputParameters().getNumCols();
- boolean rowwise = (inDim1==1 && inDim2 > 1 );
-
- if( rowwise ) { //vector
- outputDim1 = ngroups;
- outputDim2 = 1;
- }
- else { //vector or matrix
- outputDim1 = inDim2;
- outputDim2 = ngroups;
- }
-
- }
-
GroupedAggregate grp_agg = new GroupedAggregate(inputlops, isWeighted, getDataType(), getValueType());
grp_agg.getOutputParameters().setDimensions(outputDim1, outputDim2, getRowsInBlock(), getColsInBlock(), -1);
setLineNumbers(grp_agg);
@@ -312,26 +322,25 @@ public class ParameterizedBuiltinOp extends Hop
setLops(grp_agg);
setRequiresReblock( true );
}
- else //CP
+ else //CP/Spark
{
- GroupedAggregate grp_agg = new GroupedAggregate(inputlops,
- getDataType(), getValueType(), et);
- // output dimensions are unknown at compilation time
- grp_agg.getOutputParameters().setDimensions(-1, -1, -1, -1, -1);
- grp_agg.setAllPositions(this.getBeginLine(), this.getBeginColumn(), this.getEndLine(), this.getEndColumn());
+ GroupedAggregate grp_agg = null;
- // introduce a reblock lop only if it is NOT single_node execution
- if( et == ExecType.CP){
- //force blocked output in CP (see below)
- grp_agg.getOutputParameters().setDimensions(-1, 1, getRowsInBlock(), getColsInBlock(), -1);
+ if( et == ExecType.CP)
+ {
+ int k = OptimizerUtils.getConstrainedNumThreads( _maxNumThreads );
+ grp_agg = new GroupedAggregate(inputlops, getDataType(), getValueType(), et, k);
+ grp_agg.getOutputParameters().setDimensions(outputDim1, outputDim2, getRowsInBlock(), getColsInBlock(), -1);
}
- else if(et == ExecType.SPARK) {
- grp_agg.getOutputParameters().setDimensions(-1, 1, -1, -1, -1);
+ else if(et == ExecType.SPARK)
+ {
+ grp_agg = new GroupedAggregate(inputlops, getDataType(), getValueType(), et);
+ grp_agg.getOutputParameters().setDimensions(outputDim1, outputDim2, -1, -1, -1);
setRequiresReblock( true );
}
- //grouped agg, w/o reblock in CP
- setLops(grp_agg);
+ setLineNumbers(grp_agg);
+ setLops(grp_agg);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/039d55c0/src/main/java/org/apache/sysml/lops/GroupedAggregate.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/GroupedAggregate.java b/src/main/java/org/apache/sysml/lops/GroupedAggregate.java
index 9498e94..f54857d 100644
--- a/src/main/java/org/apache/sysml/lops/GroupedAggregate.java
+++ b/src/main/java/org/apache/sysml/lops/GroupedAggregate.java
@@ -39,8 +39,9 @@ public class GroupedAggregate extends Lop
private static final String opcode = "groupedagg";
public static final String COMBINEDINPUT = "combinedinput";
- private boolean _weights = false;
-
+ private boolean _weights = false;
+ private int _numThreads = 1;
+
/**
* Constructor to perform grouped aggregate.
* inputParameterLops <- parameters required to compute different aggregates (hashmap)
@@ -62,6 +63,14 @@ public class GroupedAggregate extends Lop
init(inputParameterLops, dt, vt, et);
}
+ public GroupedAggregate(
+ HashMap<String, Lop> inputParameterLops,
+ DataType dt, ValueType vt, ExecType et, int k) {
+ super(Lop.Type.GroupedAgg, dt, vt);
+ init(inputParameterLops, dt, vt, et);
+ _numThreads = k;
+ }
+
/**
*
* @param inputParameterLops
@@ -188,8 +197,15 @@ public class GroupedAggregate extends Lop
}
}
+ if( getExecType()==ExecType.CP ) {
+ sb.append( OPERAND_DELIMITOR );
+ sb.append( "k" );
+ sb.append( Lop.NAME_VALUE_SEPARATOR );
+ sb.append( _numThreads );
+ }
+
sb.append( OPERAND_DELIMITOR );
- sb.append( this.prepOutputOperand(output));
+ sb.append( prepOutputOperand(output));
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/039d55c0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index 9536db1..4eee876 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -29,7 +29,6 @@ import java.util.Set;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.conf.DMLConfig;
import org.apache.sysml.hops.AggBinaryOp;
@@ -38,12 +37,14 @@ import org.apache.sysml.hops.FunctionOp;
import org.apache.sysml.hops.Hop;
import org.apache.sysml.hops.AggBinaryOp.MMultMethod;
import org.apache.sysml.hops.Hop.MultiThreadedHop;
+import org.apache.sysml.hops.Hop.ParamBuiltinOp;
import org.apache.sysml.hops.Hop.ReOrgOp;
import org.apache.sysml.hops.HopsException;
import org.apache.sysml.hops.IndexingOp;
import org.apache.sysml.hops.LeftIndexingOp;
import org.apache.sysml.hops.LiteralOp;
import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.hops.ParameterizedBuiltinOp;
import org.apache.sysml.hops.ReorgOp;
import org.apache.sysml.hops.rewrite.HopRewriteUtils;
import org.apache.sysml.hops.rewrite.ProgramRewriteStatus;
@@ -1523,7 +1524,9 @@ public class OptimizerRuleBased extends Optimizer
//set degree of parallelism for multi-threaded leaf nodes
Hop h = OptTreeConverter.getAbstractPlanMapping().getMappedHop(c.getID());
if( OptimizerUtils.PARALLEL_CP_MATRIX_MULTIPLY
- && h instanceof MultiThreadedHop ) //abop, datagenop, qop
+ && h instanceof MultiThreadedHop //abop, datagenop, qop, paramop
+ && !( h instanceof ParameterizedBuiltinOp //only paramop-grpagg
+ && ((ParameterizedBuiltinOp)h).getOp()!=ParamBuiltinOp.GROUPEDAGG) )
{
MultiThreadedHop mhop = (MultiThreadedHop) h;
mhop.setMaxNumThreads(opsK); //set max constraint in hop
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/039d55c0/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
index f2705cc..bd3153a 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
@@ -165,7 +165,8 @@ public class ParameterizedBuiltinCPInstruction extends ComputationCPInstruction
}
// compute the result
- MatrixBlock soresBlock = (MatrixBlock) (groups.groupedAggOperations(target, weights, new MatrixBlock(), ngroups, _optr));
+ int k = Integer.parseInt(params.get("k")); //num threads
+ MatrixBlock soresBlock = (MatrixBlock) (groups.groupedAggOperations(target, weights, new MatrixBlock(), ngroups, _optr, k));
ec.setMatrixOutput(output.getName(), soresBlock);
// release locks
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/039d55c0/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
index 21a5a6b..1373df3 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
@@ -30,6 +30,7 @@ import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.DMLUnsupportedOperationException;
import org.apache.sysml.runtime.functionobjects.Builtin;
import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinFunctionCode;
+import org.apache.sysml.runtime.functionobjects.CM;
import org.apache.sysml.runtime.functionobjects.IndexFunction;
import org.apache.sysml.runtime.functionobjects.KahanFunction;
import org.apache.sysml.runtime.functionobjects.KahanPlus;
@@ -41,9 +42,12 @@ import org.apache.sysml.runtime.functionobjects.ReduceCol;
import org.apache.sysml.runtime.functionobjects.ReduceDiag;
import org.apache.sysml.runtime.functionobjects.ReduceRow;
import org.apache.sysml.runtime.functionobjects.ValueFunction;
+import org.apache.sysml.runtime.instructions.cp.CM_COV_Object;
import org.apache.sysml.runtime.instructions.cp.KahanObject;
import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
+import org.apache.sysml.runtime.matrix.operators.CMOperator;
+import org.apache.sysml.runtime.matrix.operators.Operator;
import org.apache.sysml.runtime.matrix.operators.UnaryOperator;
import org.apache.sysml.runtime.util.UtilFunctions;
@@ -385,9 +389,89 @@ public class LibMatrixAgg
return val;
}
+
+ /**
+ *
+ * @param groups
+ * @param target
+ * @param weights
+ * @param result
+ * @param numGroups
+ * @param op
+ * @throws DMLRuntimeException
+ */
+ public static void groupedAggregate(MatrixBlock groups, MatrixBlock target, MatrixBlock weights, MatrixBlock result, int numGroups, Operator op)
+ throws DMLRuntimeException
+ {
+ if( !(op instanceof CMOperator || op instanceof AggregateOperator) ) {
+ throw new DMLRuntimeException("Invalid operator (" + op + ") encountered while processing groupedAggregate.");
+ }
+
+ //CM operator for count, mean, variance
+ //note: current support only for column vectors
+ if(op instanceof CMOperator) {
+ CMOperator cmOp = (CMOperator) op;
+ groupedAggregateCM(groups, target, weights, result, numGroups, cmOp, 0, target.clen);
+ }
+ //Aggregate operator for sum (via kahan sum)
+ //note: support for row/column vectors and dense/sparse
+ else if( op instanceof AggregateOperator ) {
+ AggregateOperator aggop = (AggregateOperator) op;
+ groupedAggregateKahanPlus(groups, target, weights, result, numGroups, aggop, 0, target.clen);
+ }
+ }
/**
*
+ * @param groups
+ * @param target
+ * @param weights
+ * @param result
+ * @param numGroups
+ * @param op
+ * @param k
+ * @throws DMLRuntimeException
+ */
+ public static void groupedAggregate(MatrixBlock groups, MatrixBlock target, MatrixBlock weights, MatrixBlock result, int numGroups, Operator op, int k)
+ throws DMLRuntimeException
+ {
+ //fall back to sequential version if necessary
+ boolean rowVector = (target.getNumRows()==1 && target.getNumColumns()>1);
+ if( k <= 1 || (long)target.rlen*target.clen < PAR_NUMCELL_THRESHOLD || rowVector ) {
+ groupedAggregate(groups, target, weights, result, numGroups, op);
+ return;
+ }
+
+ if( !(op instanceof CMOperator || op instanceof AggregateOperator) ) {
+ throw new DMLRuntimeException("Invalid operator (" + op + ") encountered while processing groupedAggregate.");
+ }
+
+ //preprocessing
+ result.sparse = false;
+ result.allocateDenseBlock();
+
+ //core multi-threaded grouped aggregate computation
+ //(currently: parallelization over columns to avoid additional memory requirements)
+ try {
+ ExecutorService pool = Executors.newFixedThreadPool( k );
+ ArrayList<GrpAggTask> tasks = new ArrayList<GrpAggTask>();
+ int blklen = (int)(Math.ceil((double)target.clen/k));
+ for( int i=0; i<k & i*blklen<target.clen; i++ )
+ tasks.add( new GrpAggTask(groups, target, weights, result, numGroups, op, i*blklen, Math.min((i+1)*blklen, target.clen)) );
+ pool.invokeAll(tasks);
+ pool.shutdown();
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException(ex);
+ }
+
+ //postprocessing
+ result.recomputeNonZeros();
+ result.examSparsity();
+ }
+
+ /**
+ *
* @param op
* @return
*/
@@ -619,6 +703,211 @@ public class LibMatrixAgg
return kbuff._sum;
}
+
+ /**
+ * This is a specific implementation for aggregate(fn="sum"), where we use KahanPlus for numerical
+ * stability. In contrast to other functions of aggregate, this implementation supports row and column
+ * vectors for target and exploits sparse representations since KahanPlus is sparse-safe.
+ *
+ * @param target
+ * @param weights
+ * @param op
+ * @throws DMLRuntimeException
+ */
+ private static void groupedAggregateKahanPlus( MatrixBlock groups, MatrixBlock target, MatrixBlock weights, MatrixBlock result, int numGroups, AggregateOperator aggop, int cl, int cu )
+ throws DMLRuntimeException
+ {
+ boolean rowVector = (target.getNumRows()==1 && target.getNumColumns()>1);
+ int numCols = (!rowVector) ? target.getNumColumns() : 1;
+ double w = 1; //default weight
+
+ //skip empty blocks (sparse-safe operation)
+ if( target.isEmptyBlock(false) )
+ return;
+
+ //init group buffers
+ int numCols2 = cu-cl;
+ KahanObject[][] buffer = new KahanObject[numGroups][numCols2];
+ for( int i=0; i<numGroups; i++ )
+ for( int j=0; j<numCols2; j++ )
+ buffer[i][j] = new KahanObject(aggop.initialValue, 0);
+
+ if( rowVector ) //target is rowvector
+ {
+ //note: always sequential, no need to respect cl/cu
+
+ if( target.sparse ) //SPARSE target
+ {
+ if( target.sparseRows[0]!=null )
+ {
+ int len = target.sparseRows[0].size();
+ int[] aix = target.sparseRows[0].getIndexContainer();
+ double[] avals = target.sparseRows[0].getValueContainer();
+ for( int j=0; j<len; j++ ) //for each nnz
+ {
+ int g = (int) groups.quickGetValue(aix[j], 0);
+ if ( g > numGroups )
+ continue;
+ if ( weights != null )
+ w = weights.quickGetValue(aix[j],0);
+ aggop.increOp.fn.execute(buffer[g-1][0], avals[j]*w);
+ }
+ }
+
+ }
+ else //DENSE target
+ {
+ for ( int i=0; i < target.getNumColumns(); i++ ) {
+ double d = target.denseBlock[ i ];
+ if( d != 0 ) //sparse-safe
+ {
+ int g = (int) groups.quickGetValue(i, 0);
+ if ( g > numGroups )
+ continue;
+ if ( weights != null )
+ w = weights.quickGetValue(i,0);
+ // buffer is 0-indexed, whereas range of values for g = [1,numGroups]
+ aggop.increOp.fn.execute(buffer[g-1][0], d*w);
+ }
+ }
+ }
+ }
+ else //column vector or matrix
+ {
+ if( target.sparse ) //SPARSE target
+ {
+ SparseRow[] a = target.sparseRows;
+
+ for( int i=0; i < groups.getNumRows(); i++ )
+ {
+ int g = (int) groups.quickGetValue(i, 0);
+ if ( g > numGroups )
+ continue;
+
+ if( a[i] != null && !a[i].isEmpty() )
+ {
+ int len = a[i].size();
+ int[] aix = a[i].getIndexContainer();
+ double[] avals = a[i].getValueContainer();
+ int j = (cl==0) ? 0 : a[i].searchIndexesFirstGTE(cl);
+ j = (j>=0) ? j : len;
+
+ for( ; j<len && aix[j]<cu; j++ ) //for each nnz
+ {
+ if ( weights != null )
+ w = weights.quickGetValue(aix[j],0);
+ aggop.increOp.fn.execute(buffer[g-1][aix[j]-cl], avals[j]*w);
+ }
+ }
+ }
+ }
+ else //DENSE target
+ {
+ double[] a = target.denseBlock;
+
+ for( int i=0, aix=0; i < groups.getNumRows(); i++, aix+=numCols )
+ {
+ int g = (int) groups.quickGetValue(i, 0);
+ if ( g > numGroups )
+ continue;
+
+ for( int j=cl; j < cu; j++ ) {
+ double d = a[ aix+j ];
+ if( d != 0 ) { //sparse-safe
+ if ( weights != null )
+ w = weights.quickGetValue(i,0);
+ // buffer is 0-indexed, whereas range of values for g = [1,numGroups]
+ aggop.increOp.fn.execute(buffer[g-1][j-cl], d*w);
+ }
+ }
+ }
+ }
+ }
+
+ // extract the results from group buffers
+ for( int i=0; i < numGroups; i++ )
+ for( int j=0; j < numCols2; j++ )
+ result.appendValue(i, j+cl, buffer[i][j]._sum);
+ }
+
+ /**
+ *
+ * @param target
+ * @param weights
+ * @param result
+ * @param cmOp
+ * @throws DMLRuntimeException
+ */
+ private static void groupedAggregateCM( MatrixBlock groups, MatrixBlock target, MatrixBlock weights, MatrixBlock result, int numGroups, CMOperator cmOp, int cl, int cu )
+ throws DMLRuntimeException
+ {
+ CM cmFn = CM.getCMFnObject(((CMOperator) cmOp).getAggOpType());
+ double w = 1; //default weight
+
+ //init group buffers
+ int numCols2 = cu-cl;
+ CM_COV_Object[][] cmValues = new CM_COV_Object[numGroups][numCols2];
+ for ( int i=0; i < numGroups; i++ )
+ for( int j=0; j < numCols2; j++ )
+ cmValues[i][j] = new CM_COV_Object();
+
+ //column vector or matrix
+ if( target.sparse ) //SPARSE target
+ {
+ SparseRow[] a = target.sparseRows;
+
+ for( int i=0; i < groups.getNumRows(); i++ )
+ {
+ int g = (int) groups.quickGetValue(i, 0);
+ if ( g > numGroups )
+ continue;
+
+ if( a[i] != null && !a[i].isEmpty() )
+ {
+ int len = a[i].size();
+ int[] aix = a[i].getIndexContainer();
+ double[] avals = a[i].getValueContainer();
+ int j = (cl==0) ? 0 : a[i].searchIndexesFirstGTE(cl);
+ j = (j>=0) ? j : len;
+
+ for( ; j<len && aix[j]<cu; j++ ) //for each nnz
+ {
+ if ( weights != null )
+ w = weights.quickGetValue(i, 0);
+ cmFn.execute(cmValues[g-1][aix[j]-cl], avals[j], w);
+ }
+ //TODO sparse unsafe correction
+ }
+ }
+ }
+ else //DENSE target
+ {
+ double[] a = target.denseBlock;
+
+ for( int i=0, aix=0; i < groups.getNumRows(); i++, aix+=target.clen )
+ {
+ int g = (int) groups.quickGetValue(i, 0);
+ if ( g > numGroups )
+ continue;
+
+ for( int j=cl; j<cu; j++ ) {
+ double d = a[ aix+j ]; //sparse unsafe
+ if ( weights != null )
+ w = weights.quickGetValue(i,0);
+ // buffer is 0-indexed, whereas range of values for g = [1,numGroups]
+ cmFn.execute(cmValues[g-1][j-cl], d, w);
+ }
+ }
+ }
+
+ // extract the required value from each CM_COV_Object
+ for( int i=0; i < numGroups; i++ )
+ for( int j=0; j < numCols2; j++ ) {
+ // result is 0-indexed, so is cmValues
+ result.appendValue(i, j, cmValues[i][j+cl].getRequiredResult(cmOp));
+ }
+ }
+
/**
*
* @param in
@@ -2861,5 +3150,48 @@ public class LibMatrixAgg
return _ret;
}
}
-}
+
+ private static class GrpAggTask extends AggTask
+ {
+ private MatrixBlock _groups = null;
+ private MatrixBlock _target = null;
+ private MatrixBlock _weights = null;
+ private MatrixBlock _ret = null;
+ private int _numGroups = -1;
+ private Operator _op = null;
+ private int _cl = -1;
+ private int _cu = -1;
+ protected GrpAggTask( MatrixBlock groups, MatrixBlock target, MatrixBlock weights, MatrixBlock ret, int numGroups, Operator op, int cl, int cu )
+ throws DMLRuntimeException
+ {
+ _groups = groups;
+ _target = target;
+ _weights = weights;
+ _ret = ret;
+ _numGroups = numGroups;
+ _op = op;
+ _cl = cl;
+ _cu = cu;
+ }
+
+ @Override
+ public Object call() throws DMLRuntimeException
+ {
+ //CM operator for count, mean, variance
+ //note: current support only for column vectors
+ if( _op instanceof CMOperator ) {
+ CMOperator cmOp = (CMOperator) _op;
+ groupedAggregateCM(_groups, _target, _weights, _ret, _numGroups, cmOp, _cl, _cu);
+ }
+ //Aggregate operator for sum (via kahan sum)
+ //note: support for row/column vectors and dense/sparse
+ else if( _op instanceof AggregateOperator ) {
+ AggregateOperator aggop = (AggregateOperator) _op;
+ groupedAggregateKahanPlus(_groups, _target, _weights, _ret, _numGroups, aggop, _cl, _cu);
+ }
+
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/039d55c0/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index ca569f3..c620908 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -47,7 +47,6 @@ import org.apache.sysml.parser.DMLTranslator;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.DMLUnsupportedOperationException;
import org.apache.sysml.runtime.functionobjects.Builtin;
-import org.apache.sysml.runtime.functionobjects.CM;
import org.apache.sysml.runtime.functionobjects.CTable;
import org.apache.sysml.runtime.functionobjects.DiagIndex;
import org.apache.sysml.runtime.functionobjects.Divide;
@@ -5289,6 +5288,25 @@ public class MatrixBlock extends MatrixValue implements Externalizable
public MatrixValue groupedAggOperations(MatrixValue tgt, MatrixValue wghts, MatrixValue ret, int ngroups, Operator op)
throws DMLRuntimeException, DMLUnsupportedOperationException
{
+ //single-threaded grouped aggregate
+ return groupedAggOperations(tgt, wghts, ret, ngroups, op, 1);
+ }
+
+ /**
+ *
+ * @param tgt
+ * @param wghts
+ * @param ret
+ * @param ngroups
+ * @param op
+ * @param k
+ * @return
+ * @throws DMLRuntimeException
+ * @throws DMLUnsupportedOperationException
+ */
+ public MatrixValue groupedAggOperations(MatrixValue tgt, MatrixValue wghts, MatrixValue ret, int ngroups, Operator op, int k)
+ throws DMLRuntimeException, DMLUnsupportedOperationException
+ {
//setup input matrices
// this <- groups
MatrixBlock target = checkType(tgt);
@@ -5332,228 +5350,15 @@ public class MatrixBlock extends MatrixValue implements Externalizable
else
result.reset(numGroups, rowVector?1:target.getNumColumns(), result_sparsity);
- //CM operator for count, mean, variance
- //note: current support only for column vectors
- if(op instanceof CMOperator)
- {
- // initialize required objects for storing the result of CM operations
- CMOperator cmOp = (CMOperator) op;
-
- groupedAggregateCM(target, weights, result, cmOp);
- }
- //Aggregate operator for sum (via kahan sum)
- //note: support for row/column vectors and dense/sparse
- else if( op instanceof AggregateOperator )
- {
- //the only aggregate operator that is supported here is sum,
- //furthermore, we always use KahanPlus and hence aggop.correctionExists is true
-
- AggregateOperator aggop = (AggregateOperator) op;
-
- //default case for aggregate(sum)
- groupedAggregateKahanPlus(target, weights, result, aggop);
- }
+ //execute grouped aggregate operation
+ if( k > 1 )
+ LibMatrixAgg.groupedAggregate(this, target, weights, result, numGroups, op, k);
else
- throw new DMLRuntimeException("Invalid operator (" + op + ") encountered while processing groupedAggregate.");
+ LibMatrixAgg.groupedAggregate(this, target, weights, result, numGroups, op);
return result;
}
-
- /**
- * This is a specific implementation for aggregate(fn="sum"), where we use KahanPlus for numerical
- * stability. In contrast to other functions of aggregate, this implementation supports row and column
- * vectors for target and exploits sparse representations since KahanPlus is sparse-safe.
- *
- * @param target
- * @param weights
- * @param op
- * @throws DMLRuntimeException
- */
- private void groupedAggregateKahanPlus( MatrixBlock target, MatrixBlock weights, MatrixBlock result, AggregateOperator aggop )
- throws DMLRuntimeException
- {
- boolean rowVector = (target.getNumRows()==1 && target.getNumColumns()>1);
- int numCols = (!rowVector) ? target.getNumColumns() : 1;
- double w = 1; //default weight
-
- //skip empty blocks (sparse-safe operation)
- if( target.isEmptyBlock(false) )
- return;
-
- //init group buffers
- KahanObject[][] buffer = new KahanObject[numGroups][numCols];
- for( int i=0; i<numGroups; i++ )
- for( int j=0; j<numCols; j++ )
- buffer[i][j] = new KahanObject(aggop.initialValue, 0);
-
- if( rowVector ) //target is rowvector
- {
- if( target.sparse ) //SPARSE target
- {
- if( target.sparseRows[0]!=null )
- {
- int len = target.sparseRows[0].size();
- int[] aix = target.sparseRows[0].getIndexContainer();
- double[] avals = target.sparseRows[0].getValueContainer();
- for( int j=0; j<len; j++ ) //for each nnz
- {
- int g = (int) this.quickGetValue(aix[j], 0);
- if ( g > numGroups )
- continue;
- if ( weights != null )
- w = weights.quickGetValue(aix[j],0);
- aggop.increOp.fn.execute(buffer[g-1][0], avals[j]*w);
- }
- }
-
- }
- else //DENSE target
- {
- for ( int i=0; i < target.getNumColumns(); i++ ) {
- double d = target.denseBlock[ i ];
- if( d != 0 ) //sparse-safe
- {
- int g = (int) this.quickGetValue(i, 0);
- if ( g > numGroups )
- continue;
- if ( weights != null )
- w = weights.quickGetValue(i,0);
- // buffer is 0-indexed, whereas range of values for g = [1,numGroups]
- aggop.increOp.fn.execute(buffer[g-1][0], d*w);
- }
- }
- }
- }
- else //column vector or matrix
- {
- if( target.sparse ) //SPARSE target
- {
- SparseRow[] a = target.sparseRows;
-
- for( int i=0; i < getNumRows(); i++ )
- {
- int g = (int) this.quickGetValue(i, 0);
- if ( g > numGroups )
- continue;
-
- if( a[i] != null && !a[i].isEmpty() )
- {
- int len = a[i].size();
- int[] aix = a[i].getIndexContainer();
- double[] avals = a[i].getValueContainer();
- for( int j=0; j<len; j++ ) //for each nnz
- {
- if ( weights != null )
- w = weights.quickGetValue(aix[j],0);
- aggop.increOp.fn.execute(buffer[g-1][aix[j]], avals[j]*w);
- }
- }
- }
- }
- else //DENSE target
- {
- double[] a = target.denseBlock;
-
- for( int i=0, aix=0; i < getNumRows(); i++, aix+=numCols )
- {
- int g = (int) this.quickGetValue(i, 0);
- if ( g > numGroups )
- continue;
-
- for( int j=0; j < numCols; j++ ) {
- double d = a[ aix+j ];
- if( d != 0 ) { //sparse-safe
- if ( weights != null )
- w = weights.quickGetValue(i,0);
- // buffer is 0-indexed, whereas range of values for g = [1,numGroups]
- aggop.increOp.fn.execute(buffer[g-1][j], d*w);
- }
- }
- }
- }
- }
-
- // extract the results from group buffers
- for( int i=0; i < numGroups; i++ )
- for( int j=0; j < numCols; j++ )
- result.appendValue(i, j, buffer[i][j]._sum);
- }
-
- /**
- *
- * @param target
- * @param weights
- * @param result
- * @param cmOp
- * @throws DMLRuntimeException
- */
- private void groupedAggregateCM( MatrixBlock target, MatrixBlock weights, MatrixBlock result, CMOperator cmOp )
- throws DMLRuntimeException
- {
- CM cmFn = CM.getCMFnObject(((CMOperator) cmOp).getAggOpType());
- double w = 1; //default weight
-
- //init group buffers
- CM_COV_Object[][] cmValues = new CM_COV_Object[numGroups][target.clen];
- for ( int i=0; i < numGroups; i++ )
- for( int j=0; j < target.clen; j++ )
- cmValues[i][j] = new CM_COV_Object();
-
-
- //column vector or matrix
- if( target.sparse ) //SPARSE target
- {
- SparseRow[] a = target.sparseRows;
-
- for( int i=0; i < getNumRows(); i++ )
- {
- int g = (int) this.quickGetValue(i, 0);
- if ( g > numGroups )
- continue;
-
- if( a[i] != null && !a[i].isEmpty() )
- {
- int len = a[i].size();
- int[] aix = a[i].getIndexContainer();
- double[] avals = a[i].getValueContainer();
- for( int j=0; j<len; j++ ) //for each nnz
- {
- if ( weights != null )
- w = weights.quickGetValue(aix[j],0);
- cmFn.execute(cmValues[g-1][aix[j]], avals[j], w);
- }
- //TODO sparse unsafe correction
- }
- }
- }
- else //DENSE target
- {
- double[] a = target.denseBlock;
-
- for( int i=0, aix=0; i < getNumRows(); i++, aix+=target.clen )
- {
- int g = (int) this.quickGetValue(i, 0);
- if ( g > numGroups )
- continue;
-
- for( int j=0; j < target.clen; j++ ) {
- double d = a[ aix+j ]; //sparse unsafe
- if ( weights != null )
- w = weights.quickGetValue(i,0);
- // buffer is 0-indexed, whereas range of values for g = [1,numGroups]
- cmFn.execute(cmValues[g-1][j], d, w);
- }
- }
- }
-
- // extract the required value from each CM_COV_Object
- for( int i=0; i < numGroups; i++ )
- for( int j=0; j < target.clen; j++ ) {
- // result is 0-indexed, so is cmValues
- result.appendValue(i, j, cmValues[i][j].getRequiredResult(cmOp));
- }
- }
/**
*
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/039d55c0/src/test/java/org/apache/sysml/test/integration/functions/aggregate/FullGroupedAggregateMatrixTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/aggregate/FullGroupedAggregateMatrixTest.java b/src/test/java/org/apache/sysml/test/integration/functions/aggregate/FullGroupedAggregateMatrixTest.java
index 6a165f2..f3e4201 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/aggregate/FullGroupedAggregateMatrixTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/aggregate/FullGroupedAggregateMatrixTest.java
@@ -157,6 +157,16 @@ public class FullGroupedAggregateMatrixTest extends AutomatedTestBase
runGroupedAggregateOperationTest(TEST_NAME1, OpType.MOMENT4, true, ExecType.CP);
}
+ @Test
+ public void testGroupedAggSumDenseWideCP() {
+ runGroupedAggregateOperationTest(TEST_NAME1, OpType.SUM, false, ExecType.CP, cols2);
+ }
+
+ @Test
+ public void testGroupedAggSumSparseWideCP() {
+ runGroupedAggregateOperationTest(TEST_NAME1, OpType.SUM, true, ExecType.CP, cols2);
+ }
+
//special CP testcases (negative)
@Test