You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/01/01 22:16:15 UTC
[2/3] incubator-systemml git commit: Local pre-aggregation for spark
grouped aggregate on sum w/o weights
Local pre-aggregation for spark grouped aggregate on sum w/o weights
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/4b716540
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/4b716540
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/4b716540
Branch: refs/heads/master
Commit: 4b7165409ad83842f224cefa0e29dbec93fb9d75
Parents: 2e5b724
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Thu Dec 31 13:25:11 2015 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Thu Dec 31 13:25:11 2015 -0800
----------------------------------------------------------------------
.../cp/ParameterizedBuiltinCPInstruction.java | 2 +-
.../ParameterizedBuiltinSPInstruction.java | 7 ++-
.../spark/functions/ExtractGroup.java | 57 +++++++++++++++-----
.../sysml/runtime/matrix/data/MatrixBlock.java | 5 +-
4 files changed, 54 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4b716540/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
index bd3153a..4dbc4a5 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
@@ -166,7 +166,7 @@ public class ParameterizedBuiltinCPInstruction extends ComputationCPInstruction
// compute the result
int k = Integer.parseInt(params.get("k")); //num threads
- MatrixBlock soresBlock = (MatrixBlock) (groups.groupedAggOperations(target, weights, new MatrixBlock(), ngroups, _optr, k));
+ MatrixBlock soresBlock = groups.groupedAggOperations(target, weights, new MatrixBlock(), ngroups, _optr, k);
ec.setMatrixOutput(output.getName(), soresBlock);
// release locks
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4b716540/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
index 75da3b9..257e174 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
@@ -205,6 +205,11 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction
}
else //input vector or matrix
{
+ long ngroups = -1;
+ if ( params.get(Statement.GAGG_NUM_GROUPS) != null) {
+ ngroups = (long) Double.parseDouble(params.get(Statement.GAGG_NUM_GROUPS));
+ }
+
//replicate groups if necessary
if( mc1.getNumColBlocks() > 1 ) {
groups = groups.flatMapToPair(
@@ -212,7 +217,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction
}
groupWeightedCells = groups.join(target)
- .flatMapToPair(new ExtractGroup(mc1.getColsPerBlock()));
+ .flatMapToPair(new ExtractGroup(mc1.getColsPerBlock(), ngroups, _optr));
}
// Step 2: Make sure we have brlen required while creating <MatrixIndexes, MatrixCell>
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4b716540/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
index 6259955..283e710 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
@@ -24,19 +24,27 @@ import java.util.ArrayList;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
+
+import org.apache.sysml.hops.OptimizerUtils;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
import org.apache.sysml.runtime.matrix.data.WeightedCell;
+import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
+import org.apache.sysml.runtime.matrix.operators.Operator;
import org.apache.sysml.runtime.util.UtilFunctions;
public class ExtractGroup implements PairFlatMapFunction<Tuple2<MatrixIndexes,Tuple2<MatrixBlock, MatrixBlock>>, MatrixIndexes, WeightedCell> {
private static final long serialVersionUID = -7059358143841229966L;
- private long _bclen = -1;
+ private long _bclen = -1;
+ private long _ngroups = -1;
+ private Operator _op = null;
- public ExtractGroup( long bclen ) {
+ public ExtractGroup( long bclen, long ngroups, Operator op ) {
_bclen = bclen;
+ _ngroups = ngroups;
+ _op = op;
}
@Override
@@ -56,19 +64,44 @@ public class ExtractGroup implements PairFlatMapFunction<Tuple2<MatrixIndexes,T
//output weighted cells
ArrayList<Tuple2<MatrixIndexes, WeightedCell>> groupValuePairs = new ArrayList<Tuple2<MatrixIndexes, WeightedCell>>();
long coloff = (ix.getColumnIndex()-1)*_bclen;
- for(int i = 0; i < group.getNumRows(); i++) {
- long groupVal = UtilFunctions.toLong(group.quickGetValue(i, 0));
- if(groupVal < 1) {
- throw new Exception("Expected group values to be greater than equal to 1 but found " + groupVal);
+
+ //local pre-aggregation for sum w/ known output dimensions
+ if(_op instanceof AggregateOperator && _ngroups > 0
+ && OptimizerUtils.isValidCPDimensions(_ngroups, target.getNumColumns()) )
+ {
+ MatrixBlock tmp = group.groupedAggOperations(target, null, new MatrixBlock(), (int)_ngroups, _op);
+
+ for(int i=0; i<tmp.getNumRows(); i++) {
+ for( int j=0; j<tmp.getNumColumns(); j++ ) {
+ double tmpval = tmp.quickGetValue(i, j);
+ if( tmpval != 0 ) {
+ WeightedCell weightedCell = new WeightedCell();
+ weightedCell.setValue(tmpval);
+ weightedCell.setWeight(1);
+ MatrixIndexes ixout = new MatrixIndexes(i+1,coloff+j+1);
+ groupValuePairs.add(new Tuple2<MatrixIndexes, WeightedCell>(ixout, weightedCell));
+ }
+ }
}
- for( int j=0; j<target.getNumColumns(); j++ ) {
- WeightedCell weightedCell = new WeightedCell();
- weightedCell.setValue(target.quickGetValue(i, j));
- weightedCell.setWeight(1);
- MatrixIndexes ixout = new MatrixIndexes(groupVal,coloff+j+1);
- groupValuePairs.add(new Tuple2<MatrixIndexes, WeightedCell>(ixout, weightedCell));
+ }
+ //general case without pre-aggregation
+ else
+ {
+ for(int i = 0; i < group.getNumRows(); i++) {
+ long groupVal = UtilFunctions.toLong(group.quickGetValue(i, 0));
+ if(groupVal < 1) {
+ throw new Exception("Expected group values to be greater than equal to 1 but found " + groupVal);
+ }
+ for( int j=0; j<target.getNumColumns(); j++ ) {
+ WeightedCell weightedCell = new WeightedCell();
+ weightedCell.setValue(target.quickGetValue(i, j));
+ weightedCell.setWeight(1);
+ MatrixIndexes ixout = new MatrixIndexes(groupVal,coloff+j+1);
+ groupValuePairs.add(new Tuple2<MatrixIndexes, WeightedCell>(ixout, weightedCell));
+ }
}
}
+
return groupValuePairs;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4b716540/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index c620908..afc4788 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -5285,7 +5285,7 @@ public class MatrixBlock extends MatrixValue implements Externalizable
* @throws DMLRuntimeException
* @throws DMLUnsupportedOperationException
*/
- public MatrixValue groupedAggOperations(MatrixValue tgt, MatrixValue wghts, MatrixValue ret, int ngroups, Operator op)
+ public MatrixBlock groupedAggOperations(MatrixValue tgt, MatrixValue wghts, MatrixValue ret, int ngroups, Operator op)
throws DMLRuntimeException, DMLUnsupportedOperationException
{
//single-threaded grouped aggregate
@@ -5304,11 +5304,10 @@ public class MatrixBlock extends MatrixValue implements Externalizable
* @throws DMLRuntimeException
* @throws DMLUnsupportedOperationException
*/
- public MatrixValue groupedAggOperations(MatrixValue tgt, MatrixValue wghts, MatrixValue ret, int ngroups, Operator op, int k)
+ public MatrixBlock groupedAggOperations(MatrixValue tgt, MatrixValue wghts, MatrixValue ret, int ngroups, Operator op, int k)
throws DMLRuntimeException, DMLUnsupportedOperationException
{
//setup input matrices
- // this <- groups
MatrixBlock target = checkType(tgt);
MatrixBlock weights = checkType(wghts);