You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/03/25 21:40:06 UTC
[2/5] incubator-systemml git commit: [SYSTEMML-396] New
multi-threaded cumulative aggregates (e.g., cumsum)
[SYSTEMML-396] New multi-threaded cumulative aggregates (e.g., cumsum)
This patch introduces multi-threaded cumulative aggregates, which is
especially useful for cumsum because the used numerically stable kahan
plus is rather costly. However, the speedup is limited as a significant
fraction of time is spent in output allocation. In experiments over both
dense and sparse, we have seen speedups of 1.5x-3x, depending the used
JDK.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/1e8de49c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/1e8de49c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/1e8de49c
Branch: refs/heads/master
Commit: 1e8de49c6a690e1a130eeac47a6ad541bc33addb
Parents: 3d65cce
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Thu Mar 24 17:14:52 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Fri Mar 25 13:39:04 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/sysml/hops/UnaryOp.java | 29 +-
src/main/java/org/apache/sysml/lops/Unary.java | 60 ++--
.../parfor/opt/OptimizerRuleBased.java | 5 +-
.../runtime/instructions/InstructionUtils.java | 44 +++
.../cp/BuiltinUnaryCPInstruction.java | 10 +-
.../sysml/runtime/matrix/data/LibMatrixAgg.java | 306 ++++++++++++++++---
.../sysml/runtime/matrix/data/MatrixBlock.java | 5 +-
.../runtime/matrix/operators/UnaryOperator.java | 14 +-
8 files changed, 407 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1e8de49c/src/main/java/org/apache/sysml/hops/UnaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/UnaryOp.java b/src/main/java/org/apache/sysml/hops/UnaryOp.java
index fa0e401..e200691 100644
--- a/src/main/java/org/apache/sysml/hops/UnaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/UnaryOp.java
@@ -22,6 +22,7 @@ package org.apache.sysml.hops;
import java.util.ArrayList;
import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.hops.Hop.MultiThreadedHop;
import org.apache.sysml.lops.Aggregate;
import org.apache.sysml.lops.Aggregate.OperationTypes;
import org.apache.sysml.lops.CombineUnary;
@@ -48,11 +49,12 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
* Semantic: given a value, perform the operation (independent of other values)
*/
-public class UnaryOp extends Hop
+public class UnaryOp extends Hop implements MultiThreadedHop
{
-
private OpOp1 _op = null;
-
+
+ private int _maxNumThreads = -1; //-1 for unlimited
+
private UnaryOp() {
//default constructor for clone
@@ -98,6 +100,16 @@ public class UnaryOp extends Hop
}
@Override
+ public void setMaxNumThreads( int k ) {
+ _maxNumThreads = k;
+ }
+
+ @Override
+ public int getMaxNumThreads() {
+ return _maxNumThreads;
+ }
+
+ @Override
public Lop constructLops()
throws HopsException, LopsException
{
@@ -150,8 +162,9 @@ public class UnaryOp extends Hop
}
else //default unary
{
+ int k = isCumulativeUnaryOperation() ? OptimizerUtils.getConstrainedNumThreads( _maxNumThreads ) : 1;
Unary unary1 = new Unary(input.constructLops(), HopsOpOp1LopsU.get(_op),
- getDataType(), getValueType(), et);
+ getDataType(), getValueType(), et, k);
setOutputDimensions(unary1);
setLineNumbers(unary1);
setLops(unary1);
@@ -398,8 +411,9 @@ public class UnaryOp extends Hop
}
//in-memory cum sum (of partial aggregates)
- if( TEMP.getOutputParameters().getNumRows()!=1 ){
- Unary unary1 = new Unary( TEMP, HopsOpOp1LopsU.get(_op), DataType.MATRIX, ValueType.DOUBLE, ExecType.CP);
+ if( TEMP.getOutputParameters().getNumRows()!=1 ) {
+ int k = OptimizerUtils.getConstrainedNumThreads( _maxNumThreads );
+ Unary unary1 = new Unary( TEMP, HopsOpOp1LopsU.get(_op), DataType.MATRIX, ValueType.DOUBLE, ExecType.CP, k);
unary1.getOutputParameters().setDimensions(TEMP.getOutputParameters().getNumRows(), clen, brlen, bclen, -1);
setLineNumbers(unary1);
TEMP = unary1;
@@ -473,7 +487,8 @@ public class UnaryOp extends Hop
//in-memory cum sum (of partial aggregates)
if( TEMP.getOutputParameters().getNumRows()!=1 ){
- Unary unary1 = new Unary( TEMP, HopsOpOp1LopsU.get(_op), DataType.MATRIX, ValueType.DOUBLE, ExecType.CP);
+ int k = OptimizerUtils.getConstrainedNumThreads( _maxNumThreads );
+ Unary unary1 = new Unary( TEMP, HopsOpOp1LopsU.get(_op), DataType.MATRIX, ValueType.DOUBLE, ExecType.CP, k);
unary1.getOutputParameters().setDimensions(TEMP.getOutputParameters().getNumRows(), clen, brlen, bclen, -1);
setLineNumbers(unary1);
TEMP = unary1;
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1e8de49c/src/main/java/org/apache/sysml/lops/Unary.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/Unary.java b/src/main/java/org/apache/sysml/lops/Unary.java
index 1f56d66..8ef3e7b 100644
--- a/src/main/java/org/apache/sysml/lops/Unary.java
+++ b/src/main/java/org/apache/sysml/lops/Unary.java
@@ -46,9 +46,12 @@ public class Unary extends Lop
NOTSUPPORTED
};
- OperationTypes operation;
+ private OperationTypes operation;
+ private Lop valInput;
+
+ //cp-specific parameters
+ private int _numThreads = 1;
- Lop valInput;
/**
* Constructor to perform a unary operation with 2 inputs
@@ -109,11 +112,12 @@ public class Unary extends Lop
* @param op
* @throws LopsException
*/
- public Unary(Lop input1, OperationTypes op, DataType dt, ValueType vt, ExecType et)
+ public Unary(Lop input1, OperationTypes op, DataType dt, ValueType vt, ExecType et, int numThreads)
throws LopsException
{
super(Lop.Type.UNARY, dt, vt);
init(input1, op, dt, vt, et);
+ _numThreads = numThreads;
}
public Unary(Lop input1, OperationTypes op, DataType dt, ValueType vt)
@@ -327,28 +331,46 @@ public class Unary extends Lop
"Instruction not defined for Unary operation: " + op);
}
}
+
+ /**
+ *
+ * @param op
+ * @return
+ */
+ public static boolean isCumulativeOp(OperationTypes op) {
+ return op==OperationTypes.CUMSUM
+ || op==OperationTypes.CUMPROD
+ || op==OperationTypes.CUMMIN
+ || op==OperationTypes.CUMMAX;
+ }
+
+ @Override
public String getInstructions(String input1, String output)
throws LopsException
{
+ //sanity check number of operands
+ if( getInputs().size() != 1 ) {
+ throw new LopsException(printErrorLocation() + "Invalid number of operands ("
+ + getInputs().size() + ") for an Unary opration: " + operation);
+ }
+
// Unary operators with one input
- if (this.getInputs().size() == 1) {
-
- StringBuilder sb = new StringBuilder();
- sb.append( getExecType() );
- sb.append( Lop.OPERAND_DELIMITOR );
- sb.append( getOpcode() );
- sb.append( OPERAND_DELIMITOR );
- sb.append( getInputs().get(0).prepInputOperand(input1));
+ StringBuilder sb = new StringBuilder();
+ sb.append( getExecType() );
+ sb.append( Lop.OPERAND_DELIMITOR );
+ sb.append( getOpcode() );
+ sb.append( OPERAND_DELIMITOR );
+ sb.append( getInputs().get(0).prepInputOperand(input1) );
+ sb.append( OPERAND_DELIMITOR );
+ sb.append( prepOutputOperand(output) );
+
+ //num threads for cumulative cp ops
+ if( getExecType() == ExecType.CP && isCumulativeOp(operation) ) {
sb.append( OPERAND_DELIMITOR );
- sb.append( this.prepOutputOperand(output));
-
- return sb.toString();
-
- } else {
- throw new LopsException(this.printErrorLocation() + "Invalid number of operands ("
- + this.getInputs().size() + ") for an Unary opration: "
- + operation);
+ sb.append( _numThreads );
}
+
+ return sb.toString();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1e8de49c/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index 98fb6bb..ef6aab9 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -51,6 +51,7 @@ import org.apache.sysml.hops.LiteralOp;
import org.apache.sysml.hops.OptimizerUtils;
import org.apache.sysml.hops.ParameterizedBuiltinOp;
import org.apache.sysml.hops.ReorgOp;
+import org.apache.sysml.hops.UnaryOp;
import org.apache.sysml.hops.rewrite.HopRewriteUtils;
import org.apache.sysml.hops.rewrite.ProgramRewriteStatus;
import org.apache.sysml.hops.rewrite.ProgramRewriter;
@@ -1538,7 +1539,9 @@ public class OptimizerRuleBased extends Optimizer
if( ConfigurationManager.isParallelMatrixOperations()
&& h instanceof MultiThreadedHop //abop, datagenop, qop, paramop
&& !( h instanceof ParameterizedBuiltinOp //only paramop-grpagg
- && ((ParameterizedBuiltinOp)h).getOp()!=ParamBuiltinOp.GROUPEDAGG) )
+ && ((ParameterizedBuiltinOp)h).getOp()!=ParamBuiltinOp.GROUPEDAGG)
+ && !( h instanceof UnaryOp //only unaryop-cumulativeagg
+ && !((UnaryOp)h).isCumulativeUnaryOperation() ) )
{
MultiThreadedHop mhop = (MultiThreadedHop) h;
mhop.setMaxNumThreads(opsK); //set max constraint in hop
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1e8de49c/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
index 6c5ec95..ae0fd30 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
@@ -43,6 +43,7 @@ import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.DMLUnsupportedOperationException;
import org.apache.sysml.runtime.functionobjects.And;
import org.apache.sysml.runtime.functionobjects.Builtin;
+import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinFunctionCode;
import org.apache.sysml.runtime.functionobjects.CM;
import org.apache.sysml.runtime.functionobjects.Divide;
import org.apache.sysml.runtime.functionobjects.Equals;
@@ -79,6 +80,7 @@ import org.apache.sysml.runtime.matrix.operators.LeftScalarOperator;
import org.apache.sysml.runtime.matrix.operators.RightScalarOperator;
import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
import org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTypes;
+import org.apache.sysml.runtime.matrix.operators.UnaryOperator;
public class InstructionUtils
@@ -505,6 +507,48 @@ public class InstructionUtils
/**
*
+ * @param uop
+ * @return
+ */
+ public static AggregateUnaryOperator parseCumulativeAggregateUnaryOperator(UnaryOperator uop)
+ {
+ Builtin f = (Builtin)uop.fn;
+
+ if( f.getBuiltinFunctionCode()==BuiltinFunctionCode.CUMSUM )
+ return parseCumulativeAggregateUnaryOperator("ucumack+") ;
+ else if( f.getBuiltinFunctionCode()==BuiltinFunctionCode.CUMPROD )
+ return parseCumulativeAggregateUnaryOperator("ucumac*") ;
+ else if( f.getBuiltinFunctionCode()==BuiltinFunctionCode.CUMMIN )
+ return parseCumulativeAggregateUnaryOperator("ucumacmin") ;
+ else if( f.getBuiltinFunctionCode()==BuiltinFunctionCode.CUMMAX )
+ return parseCumulativeAggregateUnaryOperator("ucumacmax" ) ;
+
+ throw new RuntimeException("Unsupported cumulative aggregate unary operator: "+f.getBuiltinFunctionCode());
+ }
+
+ /**
+ *
+ * @param uop
+ * @return
+ */
+ public static AggregateUnaryOperator parseBasicCumulativeAggregateUnaryOperator(UnaryOperator uop)
+ {
+ Builtin f = (Builtin)uop.fn;
+
+ if( f.getBuiltinFunctionCode()==BuiltinFunctionCode.CUMSUM )
+ return parseBasicAggregateUnaryOperator("uack+") ;
+ else if( f.getBuiltinFunctionCode()==BuiltinFunctionCode.CUMPROD )
+ return parseCumulativeAggregateUnaryOperator("uac*") ;
+ else if( f.getBuiltinFunctionCode()==BuiltinFunctionCode.CUMMIN )
+ return parseCumulativeAggregateUnaryOperator("uacmin") ;
+ else if( f.getBuiltinFunctionCode()==BuiltinFunctionCode.CUMMAX )
+ return parseCumulativeAggregateUnaryOperator("uacmax" ) ;
+
+ throw new RuntimeException("Unsupported cumulative aggregate unary operator: "+f.getBuiltinFunctionCode());
+ }
+
+ /**
+ *
* @param opcode
* @return
*/
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1e8de49c/src/main/java/org/apache/sysml/runtime/instructions/cp/BuiltinUnaryCPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/BuiltinUnaryCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/BuiltinUnaryCPInstruction.java
index 2980ef1..2e5bcdf 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/BuiltinUnaryCPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/BuiltinUnaryCPInstruction.java
@@ -19,6 +19,8 @@
package org.apache.sysml.runtime.instructions.cp;
+import java.util.Arrays;
+
import org.apache.sysml.parser.Expression.DataType;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.runtime.DMLRuntimeException;
@@ -57,14 +59,18 @@ public abstract class BuiltinUnaryCPInstruction extends UnaryCPInstruction
String opcode = null;
ValueFunction func = null;
- if( parts.length==4 ) //print or stop
+ //print or stop or cumulative aggregates
+ if( parts.length==4 )
{
opcode = parts[0];
in.split(parts[1]);
out.split(parts[2]);
func = Builtin.getBuiltinFnObject(opcode);
- return new ScalarBuiltinCPInstruction(new SimpleOperator(func), in, out, opcode, str);
+ if( Arrays.asList(new String[]{"ucumk+","ucum*","ucummin","ucummax"}).contains(opcode) )
+ return new MatrixBuiltinCPInstruction(new UnaryOperator(func,Integer.parseInt(parts[3])), in, out, opcode, str);
+ else
+ return new ScalarBuiltinCPInstruction(new SimpleOperator(func), in, out, opcode, str);
}
else //2+1, general case
{
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1e8de49c/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
index 2b90ca8..e965b7e 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
@@ -21,9 +21,11 @@ package org.apache.sysml.runtime.matrix.data;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.sysml.lops.PartialAggregate.CorrectionLocationType;
import org.apache.sysml.runtime.DMLRuntimeException;
@@ -42,6 +44,7 @@ import org.apache.sysml.runtime.functionobjects.ReduceCol;
import org.apache.sysml.runtime.functionobjects.ReduceDiag;
import org.apache.sysml.runtime.functionobjects.ReduceRow;
import org.apache.sysml.runtime.functionobjects.ValueFunction;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
import org.apache.sysml.runtime.instructions.cp.CM_COV_Object;
import org.apache.sysml.runtime.instructions.cp.KahanObject;
import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
@@ -50,8 +53,10 @@ import org.apache.sysml.runtime.matrix.operators.CMOperator;
import org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTypes;
import org.apache.sysml.runtime.matrix.operators.Operator;
import org.apache.sysml.runtime.matrix.operators.UnaryOperator;
+import org.apache.sysml.runtime.util.DataConverter;
import org.apache.sysml.runtime.util.UtilFunctions;
+
/**
* MB:
* Library for matrix aggregations including ak+, uak+ for all
@@ -288,7 +293,7 @@ public class LibMatrixAgg
* @param uop
* @throws DMLRuntimeException
*/
- public static void aggregateUnaryMatrix(MatrixBlock in, MatrixBlock out, UnaryOperator uop)
+ public static MatrixBlock cumaggregateUnaryMatrix(MatrixBlock in, MatrixBlock out, UnaryOperator uop)
throws DMLRuntimeException
{
//prepare meta data
@@ -299,8 +304,7 @@ public class LibMatrixAgg
//filter empty input blocks (incl special handling for sparse-unsafe operations)
if( in.isEmptyBlock(false) ){
- aggregateUnaryMatrixEmpty(in, out, aggtype, null);
- return;
+ return aggregateUnaryMatrixEmpty(in, out, aggtype, null);
}
//allocate output arrays (if required)
@@ -310,15 +314,106 @@ public class LibMatrixAgg
//Timing time = new Timing(true);
if( !in.sparse )
- aggregateUnaryMatrixDense(in, out, aggtype, uop.fn, null, 0, m);
+ cumaggregateUnaryMatrixDense(in, out, aggtype, uop.fn, null, 0, m);
else
- aggregateUnaryMatrixSparse(in, out, aggtype, uop.fn, null, 0, m);
+ cumaggregateUnaryMatrixSparse(in, out, aggtype, uop.fn, null, 0, m);
//cleanup output and change representation (if necessary)
out.recomputeNonZeros();
out.examSparsity();
//System.out.println("uop ("+in.rlen+","+in.clen+","+in.sparse+") in "+time.stop()+"ms.");
+
+ return out;
+ }
+
+ /**
+ *
+ * @param in
+ * @param out
+ * @param uop
+ * @param k
+ * @return
+ * @throws DMLRuntimeException
+ */
+ public static MatrixBlock cumaggregateUnaryMatrix(MatrixBlock in, MatrixBlock out, UnaryOperator uop, int k)
+ throws DMLRuntimeException
+ {
+ //fall back to sequential version if necessary
+ if( k <= 1 || (long)in.rlen*in.clen < PAR_NUMCELL_THRESHOLD || in.rlen <= k
+ || out.clen*8*k > PAR_INTERMEDIATE_SIZE_THRESHOLD ) {
+ return cumaggregateUnaryMatrix(in, out, uop);
+ }
+
+ //prepare meta data
+ AggType aggtype = getAggType(uop);
+ final int m = in.rlen;
+ final int m2 = out.rlen;
+ final int n2 = out.clen;
+ final int mk = aggtype==AggType.CUM_KAHAN_SUM?2:1;
+
+ //filter empty input blocks (incl special handling for sparse-unsafe operations)
+ if( in.isEmptyBlock(false) ){
+ return aggregateUnaryMatrixEmpty(in, out, aggtype, null);
+ }
+
+ //Timing time = new Timing(true);
+
+ //allocate output arrays (if required)
+ out.reset(m2, n2, false); //always dense
+ out.allocateDenseBlock();
+
+ //core multi-threaded unary aggregate computation
+ //(currently: always parallelization over number of rows)
+ try {
+ ExecutorService pool = Executors.newFixedThreadPool( k );
+ int blklen = (int)(Math.ceil((double)m/k));
+
+ //step 1: compute aggregates per row partition
+ AggregateUnaryOperator uaop = InstructionUtils.parseBasicCumulativeAggregateUnaryOperator(uop);
+ AggType uaoptype = getAggType(uaop);
+ ArrayList<PartialAggTask> tasks = new ArrayList<PartialAggTask>();
+ for( int i=0; i<k & i*blklen<m; i++ )
+ tasks.add( new PartialAggTask(in, new MatrixBlock(mk,n2,false), uaoptype, uaop, i*blklen, Math.min((i+1)*blklen, m)) );
+ List<Future<Object>> taskret = pool.invokeAll(tasks);
+ for( Future<Object> task : taskret )
+ task.get(); //check for errors
+
+ //step 2: cumulative aggregate of partition aggregates
+ MatrixBlock tmp = new MatrixBlock(tasks.size(), n2, false);
+ for( int i=0; i<tasks.size(); i++ ) {
+ MatrixBlock row = tasks.get(i).getResult();
+ if( uaop.aggOp.correctionExists )
+ row.dropLastRowsOrColums(uaop.aggOp.correctionLocation);
+ tmp.leftIndexingOperations(row, i, i, 0, n2-1, tmp, true);
+ }
+ MatrixBlock tmp2 = cumaggregateUnaryMatrix(tmp, new MatrixBlock(tasks.size(), n2, false), uop);
+
+ //step 3: compute final cumulative aggregate
+ ArrayList<CumAggTask> tasks2 = new ArrayList<CumAggTask>();
+ for( int i=0; i<k & i*blklen<m; i++ ) {
+ double[] agg = (i==0)? new double[n2] :
+ DataConverter.convertToDoubleVector(tmp2.sliceOperations(i-1, i-1, 0, n2-1, new MatrixBlock()));
+ tasks2.add( new CumAggTask(in, agg, out, aggtype, uop, i*blklen, Math.min((i+1)*blklen, m)) );
+ }
+ List<Future<Long>> taskret2 = pool.invokeAll(tasks2);
+ pool.shutdown();
+
+ //step 4: aggregate nnz
+ out.nonZeros = 0;
+ for( Future<Long> task : taskret2 )
+ out.nonZeros += task.get();
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException(ex);
+ }
+
+ //cleanup output and change representation (if necessary)
+ out.examSparsity();
+
+ //System.out.println("uop k="+k+" ("+in.rlen+","+in.clen+","+in.sparse+") in "+time.stop()+"ms.");
+
+ return out;
}
/**
@@ -1360,19 +1455,19 @@ public class LibMatrixAgg
{
KahanObject kbuff = new KahanObject(0, 0);
KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
- d_ucumkp(a, c, m, n, kbuff, kplus);
+ d_ucumkp(a, null, c, m, n, kbuff, kplus, rl, ru);
break;
}
case CUM_PROD: //CUMPROD
{
- d_ucumm(a, c, m, n);
+ d_ucumm(a, null, c, m, n, rl, ru);
break;
}
case CUM_MIN:
case CUM_MAX:
{
double init = Double.MAX_VALUE * ((optype==AggType.CUM_MAX)?-1:1);
- d_ucummxx(a, c, m, n, init, (Builtin)vFn);
+ d_ucummxx(a, null, c, m, n, init, (Builtin)vFn, rl, ru);
break;
}
case MIN:
@@ -1490,19 +1585,19 @@ public class LibMatrixAgg
{
KahanObject kbuff = new KahanObject(0, 0);
KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
- s_ucumkp(a, c, m, n, kbuff, kplus);
+ s_ucumkp(a, null, c, m, n, kbuff, kplus, rl, ru);
break;
}
case CUM_PROD: //CUMPROD
{
- s_ucumm(a, c, m, n);
+ s_ucumm(a, null, c, m, n, rl, ru);
break;
}
case CUM_MIN:
case CUM_MAX:
{
double init = Double.MAX_VALUE * ((optype==AggType.CUM_MAX)?-1:1);
- s_ucummxx(a, c, m, n, init, (Builtin)vFn);
+ s_ucummxx(a, null, c, m, n, init, (Builtin)vFn, rl, ru);
break;
}
case MIN:
@@ -1569,6 +1664,100 @@ public class LibMatrixAgg
throw new DMLRuntimeException("Unsupported aggregation type: "+optype);
}
}
+
+ /**
+ *
+ * @param in
+ * @param out
+ * @param optype
+ * @param vFn
+ * @param agg
+ * @param rl
+ * @param ru
+ * @throws DMLRuntimeException
+ */
+ private static void cumaggregateUnaryMatrixDense(MatrixBlock in, MatrixBlock out, AggType optype, ValueFunction vFn, double[] agg, int rl, int ru)
+ throws DMLRuntimeException
+ {
+ final int m = in.rlen;
+ final int n = in.clen;
+
+ double[] a = in.getDenseBlock();
+ double[] c = out.getDenseBlock();
+
+ switch( optype )
+ {
+ case CUM_KAHAN_SUM: //CUMSUM
+ {
+ KahanObject kbuff = new KahanObject(0, 0);
+ KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
+ d_ucumkp(a, agg, c, m, n, kbuff, kplus, rl, ru);
+ break;
+ }
+ case CUM_PROD: //CUMPROD
+ {
+ d_ucumm(a, agg, c, m, n, rl, ru);
+ break;
+ }
+ case CUM_MIN:
+ case CUM_MAX:
+ {
+ double init = Double.MAX_VALUE * ((optype==AggType.CUM_MAX)?-1:1);
+ d_ucummxx(a, agg, c, m, n, init, (Builtin)vFn, rl, ru);
+ break;
+ }
+
+ default:
+ throw new DMLRuntimeException("Unsupported cumulative aggregation type: "+optype);
+ }
+ }
+
+ /**
+ *
+ * @param in
+ * @param out
+ * @param optype
+ * @param vFn
+ * @param ixFn
+ * @param rl
+ * @param ru
+ * @throws DMLRuntimeException
+ */
+ private static void cumaggregateUnaryMatrixSparse(MatrixBlock in, MatrixBlock out, AggType optype, ValueFunction vFn, double[] agg, int rl, int ru)
+ throws DMLRuntimeException
+ {
+ final int m = in.rlen;
+ final int n = in.clen;
+
+ SparseBlock a = in.getSparseBlock();
+ double[] c = out.getDenseBlock();
+
+ switch( optype )
+ {
+ case CUM_KAHAN_SUM: //CUMSUM
+ {
+ KahanObject kbuff = new KahanObject(0, 0);
+ KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
+ s_ucumkp(a, agg, c, m, n, kbuff, kplus, rl, ru);
+ break;
+ }
+ case CUM_PROD: //CUMPROD
+ {
+ s_ucumm(a, agg, c, m, n, rl, ru);
+ break;
+ }
+ case CUM_MIN:
+ case CUM_MAX:
+ {
+ double init = Double.MAX_VALUE * ((optype==AggType.CUM_MAX)?-1:1);
+ s_ucummxx(a, agg, c, m, n, init, (Builtin)vFn, rl, ru);
+ break;
+ }
+
+ default:
+ throw new DMLRuntimeException("Unsupported cumulative aggregation type: "+optype);
+ }
+ }
/**
*
@@ -1578,7 +1767,7 @@ public class LibMatrixAgg
* @param ixFn
* @throws DMLRuntimeException
*/
- private static void aggregateUnaryMatrixEmpty(MatrixBlock in, MatrixBlock out, AggType optype, IndexFunction ixFn)
+ private static MatrixBlock aggregateUnaryMatrixEmpty(MatrixBlock in, MatrixBlock out, AggType optype, IndexFunction ixFn)
throws DMLRuntimeException
{
//do nothing for pseudo sparse-safe operations
@@ -1587,7 +1776,7 @@ public class LibMatrixAgg
|| optype == AggType.CUM_KAHAN_SUM || optype == AggType.CUM_PROD
|| optype == AggType.CUM_MIN || optype == AggType.CUM_MAX)
{
- return;
+ return out;
}
//compute result based on meta data only
@@ -1639,6 +1828,8 @@ public class LibMatrixAgg
default:
throw new DMLRuntimeException("Unsupported aggregation type: "+optype);
}
+
+ return out;
}
@@ -1785,13 +1976,15 @@ public class LibMatrixAgg
* @param kbuff
* @param kplus
*/
- private static void d_ucumkp( double[] a, double[] c, int m, int n, KahanObject kbuff, KahanPlus kplus )
+ private static void d_ucumkp( double[] a, double[] agg, double[] c, int m, int n, KahanObject kbuff, KahanPlus kplus, int rl, int ru )
{
//init current row sum/correction arrays w/ neutral 0
- double[] csums = new double[ 2*n ];
+ double[] csums = new double[ 2*n ];
+ if( agg != null )
+ System.arraycopy(agg, 0, csums, 0, n);
//scan once and compute prefix sums
- for( int i=0, aix=0; i<m; i++, aix+=n ) {
+ for( int i=rl, aix=rl*n; i<ru; i++, aix+=n ) {
sumAgg( a, csums, aix, 0, n, kbuff, kplus );
System.arraycopy(csums, 0, c, aix, n);
}
@@ -1807,14 +2000,15 @@ public class LibMatrixAgg
* @param kbuff
* @param kplus
*/
- private static void d_ucumm( double[] a, double[] c, int m, int n )
+ private static void d_ucumm( double[] a, double[] agg, double[] c, int m, int n, int rl, int ru )
{
//init current row product array w/ neutral 1
- double[] cprods = new double[ n ];
- Arrays.fill(cprods, 1);
+ double[] cprods = (agg!=null) ? agg : new double[ n ];
+ if( agg == null )
+ Arrays.fill(cprods, 1);
//scan once and compute prefix products
- for( int i=0, aix=0; i<m; i++, aix+=n ) {
+ for( int i=rl, aix=rl*n; i<ru; i++, aix+=n ) {
productAgg( a, cprods, aix, 0, n );
System.arraycopy(cprods, 0, c, aix, n);
}
@@ -1829,14 +2023,15 @@ public class LibMatrixAgg
* @param n
* @param builtin
*/
- private static void d_ucummxx( double[] a, double[] c, int m, int n, double init, Builtin builtin )
+ private static void d_ucummxx( double[] a, double[] agg, double[] c, int m, int n, double init, Builtin builtin, int rl, int ru )
{
//init current row min/max array w/ extreme value
- double[] cmxx = new double[ n ];
- Arrays.fill(cmxx, init);
+ double[] cmxx = (agg!=null) ? agg : new double[ n ];
+ if( agg == null )
+ Arrays.fill(cmxx, init);
//scan once and compute prefix min/max
- for( int i=0, aix=0; i<m; i++, aix+=n ) {
+ for( int i=rl, aix=rl*n; i<ru; i++, aix+=n ) {
builtinAgg( a, cmxx, aix, n, builtin );
System.arraycopy(cmxx, 0, c, aix, n);
}
@@ -2279,13 +2474,15 @@ public class LibMatrixAgg
* @param kbuff
* @param kplus
*/
- private static void s_ucumkp( SparseBlock a, double[] c, int m, int n, KahanObject kbuff, KahanPlus kplus )
+ private static void s_ucumkp( SparseBlock a, double[] agg, double[] c, int m, int n, KahanObject kbuff, KahanPlus kplus, int rl, int ru )
{
//init current row sum/correction arrays w/ neutral 0
double[] csums = new double[ 2*n ];
-
+ if( agg != null )
+ System.arraycopy(agg, 0, csums, 0, n);
+
//scan once and compute prefix sums
- for( int i=0, ix=0; i<m; i++, ix+=n ) {
+ for( int i=rl, ix=rl*n; i<ru; i++, ix+=n ) {
if( !a.isEmpty(i) )
sumAgg( a.values(i), csums, a.indexes(i), a.pos(i), a.size(i), n, kbuff, kplus );
@@ -2302,17 +2499,18 @@ public class LibMatrixAgg
* @param m
* @param n
*/
- private static void s_ucumm( SparseBlock a, double[] c, int m, int n )
+ private static void s_ucumm( SparseBlock a, double[] agg, double[] c, int m, int n, int rl, int ru )
{
//init current row prod arrays w/ neutral 1
- double[] cprod = new double[ n ];
- Arrays.fill(cprod, 1);
+ double[] cprod = (agg!=null) ? agg : new double[ n ];
+ if( agg == null )
+ Arrays.fill(cprod, 1);
//init count arrays (helper, see correction)
int[] cnt = new int[ n ];
//scan once and compute prefix products
- for( int i=0, ix=0; i<m; i++, ix+=n )
+ for( int i=rl, ix=rl*n; i<ru; i++, ix+=n )
{
//multiply row of non-zero elements
if( !a.isEmpty(i) ) {
@@ -2345,11 +2543,12 @@ public class LibMatrixAgg
* @param init
* @param builtin
*/
- private static void s_ucummxx( SparseBlock a, double[] c, int m, int n, double init, Builtin builtin )
+ private static void s_ucummxx( SparseBlock a, double[] agg, double[] c, int m, int n, double init, Builtin builtin, int rl, int ru )
{
//init current row min/max array w/ extreme value
- double[] cmxx = new double[ n ];
- Arrays.fill(cmxx, init);
+ double[] cmxx = (agg!=null) ? agg : new double[ n ];
+ if( agg == null )
+ Arrays.fill(cmxx, init);
//init count arrays (helper, see correction)
int[] cnt = new int[ n ];
@@ -3434,6 +3633,45 @@ public class LibMatrixAgg
return _ret;
}
}
+
+ /**
+ *
+ */
+ private static class CumAggTask implements Callable<Long>
+ {
+ private MatrixBlock _in = null;
+ private double[] _agg = null;
+ private MatrixBlock _ret = null;
+ private AggType _aggtype = null;
+ private UnaryOperator _uop = null;
+ private int _rl = -1;
+ private int _ru = -1;
+
+ protected CumAggTask( MatrixBlock in, double[] agg, MatrixBlock ret, AggType aggtype, UnaryOperator uop, int rl, int ru )
+ throws DMLRuntimeException
+ {
+ _in = in;
+ _agg = agg;
+ _ret = ret;
+ _aggtype = aggtype;
+ _uop = uop;
+ _rl = rl;
+ _ru = ru;
+ }
+
+ @Override
+ public Long call() throws DMLRuntimeException
+ {
+ //compute partial cumulative aggregate
+ if( !_in.sparse )
+ cumaggregateUnaryMatrixDense(_in, _ret, _aggtype, _uop.fn, _agg, _rl, _ru);
+ else
+ cumaggregateUnaryMatrixSparse(_in, _ret, _aggtype, _uop.fn, _agg, _rl, _ru);
+
+ //recompute partial non-zeros (ru exlusive)
+ return _ret.recomputeNonZeros(_rl, _ru-1, 0, _ret.getNumColumns()-1);
+ }
+ }
/**
*
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1e8de49c/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 6116fab..06ed711 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -2823,7 +2823,10 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
if( LibMatrixAgg.isSupportedUnaryOperator(op) )
{
//e.g., cumsum/cumprod/cummin/cumax
- LibMatrixAgg.aggregateUnaryMatrix(this, ret, op);
+ if( op.getNumThreads() > 1 )
+ LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, op, op.getNumThreads());
+ else
+ LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, op);
}
else
{
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1e8de49c/src/main/java/org/apache/sysml/runtime/matrix/operators/UnaryOperator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/operators/UnaryOperator.java b/src/main/java/org/apache/sysml/runtime/matrix/operators/UnaryOperator.java
index 1a45794..2f65fb4 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/operators/UnaryOperator.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/operators/UnaryOperator.java
@@ -25,14 +25,20 @@ import org.apache.sysml.runtime.functionobjects.ValueFunction;
public class UnaryOperator extends Operator
{
-
private static final long serialVersionUID = 2441990876648978637L;
public ValueFunction fn;
- public UnaryOperator(ValueFunction p)
+ private int k; //num threads
+
+ public UnaryOperator(ValueFunction p) {
+ this(p, 1); //default single-threaded
+ }
+
+ public UnaryOperator(ValueFunction p, int numThreads)
{
fn = p;
sparseSafe = false;
+ k = numThreads;
if(fn instanceof Builtin)
{
@@ -47,4 +53,8 @@ public class UnaryOperator extends Operator
}
}
}
+
+ public int getNumThreads() {
+ return k;
+ }
}