You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/01/01 22:16:14 UTC
[1/3] incubator-systemml git commit: New in-memory reblock for spark
binarycell reblock, incl rdd read
Repository: incubator-systemml
Updated Branches:
refs/heads/master 039d55c0a -> b308c09b9
New in-memory reblock for spark binarycell reblock, incl rdd read
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/2e5b7245
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/2e5b7245
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/2e5b7245
Branch: refs/heads/master
Commit: 2e5b724521c728af5f5b7f3301e1316b0bc4968f
Parents: 039d55c
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Thu Dec 31 11:59:22 2015 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Thu Dec 31 11:59:22 2015 -0800
----------------------------------------------------------------------
.../apache/sysml/hops/recompile/Recompiler.java | 9 ++-
.../controlprogram/caching/MatrixObject.java | 10 +++-
.../context/SparkExecutionContext.java | 58 +++++++++++++++++++-
3 files changed, 70 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2e5b7245/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
index 48de372..0f2add5 100644
--- a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
+++ b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
@@ -1931,10 +1931,13 @@ public class Recompiler
return false;
}
- //robustness for usage through mlcontext (key/values of input rdds are not
- //serializable for text; also bufferpool rdd read only supported for binaryblock)
+ //robustness for usage through mlcontext (key/values of input rdds are
+ //not serializable for text; also bufferpool rdd read only supported for
+ // binarycell and binaryblock)
MatrixFormatMetaData iimd = (MatrixFormatMetaData) in.getMetaData();
- if( in.getRDDHandle() != null && iimd.getInputInfo() != InputInfo.BinaryBlockInputInfo ) {
+ if( in.getRDDHandle() != null
+ && iimd.getInputInfo() != InputInfo.BinaryBlockInputInfo
+ && iimd.getInputInfo() != InputInfo.BinaryCellInputInfo ) {
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2e5b7245/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
index b4306c3..8226f07 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
@@ -1346,6 +1346,7 @@ public class MatrixObject extends CacheableData
MatrixFormatMetaData iimd = (MatrixFormatMetaData) _metaData;
MatrixCharacteristics mc = iimd.getMatrixCharacteristics();
+ InputInfo ii = iimd.getInputInfo();
MatrixBlock mb = null;
try
{
@@ -1362,7 +1363,8 @@ public class MatrixObject extends CacheableData
long nnz = mc.getNonZeros();
//guarded rdd collect
- if( !OptimizerUtils.checkSparkCollectMemoryBudget(rlen, clen, brlen, bclen, nnz, sizePinned.get()) ) {
+ if( ii == InputInfo.BinaryBlockInputInfo && //guarded collect not for binary cell
+ !OptimizerUtils.checkSparkCollectMemoryBudget(rlen, clen, brlen, bclen, nnz, sizePinned.get()) ) {
//write RDD to hdfs and read to prevent invalid collect mem consumption
//note: lazy, partition-at-a-time collect (toLocalIterator) was significantly slower
if( !MapReduceTool.existsFileOnHDFS(_hdfsFileName) ) { //prevent overwrite existing file
@@ -1373,8 +1375,12 @@ public class MatrixObject extends CacheableData
}
mb = readMatrixFromHDFS(_hdfsFileName);
}
+ else if( ii == InputInfo.BinaryCellInputInfo ) {
+ //collect matrix block from binary block RDD
+ mb = SparkExecutionContext.toMatrixBlock(lrdd, rlen, clen, nnz);
+ }
else {
- //collect matrix block from RDD
+ //collect matrix block from binary cell RDD
mb = SparkExecutionContext.toMatrixBlock(lrdd, rlen, clen, brlen, bclen, nnz);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2e5b7245/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 2a072fc..2ee6041 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -506,8 +506,8 @@ public class SparkExecutionContext extends ExecutionContext
}
/**
- * Utility method for creating a single matrix block out of an RDD. Note that this collect call
- * might trigger execution of any pending transformations.
+ * Utility method for creating a single matrix block out of a binary block RDD.
+ * Note that this collect call might trigger execution of any pending transformations.
*
* NOTE: This is an unguarded utility function, which requires memory for both the output matrix
* and its collected, blocked representation.
@@ -578,6 +578,60 @@ public class SparkExecutionContext extends ExecutionContext
return out;
}
+ @SuppressWarnings("unchecked")
+ public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, long nnz)
+ throws DMLRuntimeException
+ {
+ return toMatrixBlock(
+ (JavaPairRDD<MatrixIndexes, MatrixCell>) rdd.getRDD(),
+ rlen, clen, nnz);
+ }
+
+ /**
+ * Utility method for creating a single matrix block out of a binary cell RDD.
+ * Note that this collect call might trigger execution of any pending transformations.
+ *
+ * @param rdd
+ * @param rlen
+ * @param clen
+ * @param nnz
+ * @return
+ * @throws DMLRuntimeException
+ */
+ public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes, MatrixCell> rdd, int rlen, int clen, long nnz)
+ throws DMLRuntimeException
+ {
+ MatrixBlock out = null;
+
+ //determine target sparse/dense representation
+ long lnnz = (nnz >= 0) ? nnz : (long)rlen * clen;
+ boolean sparse = MatrixBlock.evalSparseFormatInMemory(rlen, clen, lnnz);
+
+ //create output matrix block (w/ lazy allocation)
+ out = new MatrixBlock(rlen, clen, sparse);
+ List<Tuple2<MatrixIndexes,MatrixCell>> list = rdd.collect();
+
+ //copy blocks one-at-a-time into output matrix block
+ for( Tuple2<MatrixIndexes,MatrixCell> keyval : list )
+ {
+ //unpack index-block pair
+ MatrixIndexes ix = keyval._1();
+ MatrixCell cell = keyval._2();
+
+ //append cell to dense/sparse target in order to avoid shifting for sparse
+ //note: this append requires a final sort of sparse rows
+ out.appendValue((int)ix.getRowIndex()-1, (int)ix.getColumnIndex()-1, cell.getValue());
+ }
+
+ //post-processing output matrix
+ if( sparse )
+ out.sortSparseRows();
+ out.recomputeNonZeros();
+ out.examSparsity();
+
+ return out;
+ }
+
/**
*
* @param rdd
[3/3] incubator-systemml git commit: New broadcast-based spark
grouped aggregate (compiler/runtime)
Posted by mb...@apache.org.
New broadcast-based spark grouped aggregate (compiler/runtime)
Incl various cleanups and fix reducebykey instead of groupbykey code
path for aggregate operators (e.g., sum()).
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b308c09b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b308c09b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b308c09b
Branch: refs/heads/master
Commit: b308c09b90b3e88e7a30a7d77cc1b3f06bf82cc7
Parents: 4b71654
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Fri Jan 1 13:15:37 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Fri Jan 1 13:15:37 2016 -0800
----------------------------------------------------------------------
.../sysml/hops/ParameterizedBuiltinOp.java | 8 +-
.../org/apache/sysml/lops/GroupedAggregate.java | 19 +++++
.../context/SparkExecutionContext.java | 16 ++++
.../ParameterizedBuiltinSPInstruction.java | 47 +++++++----
.../spark/functions/ExtractGroup.java | 82 ++++++++++++++++----
.../functions/PerformGroupByAggInCombiner.java | 27 ++++---
6 files changed, 153 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b308c09b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
index 99eebaa..44714d4 100644
--- a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
+++ b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
@@ -334,7 +334,13 @@ public class ParameterizedBuiltinOp extends Hop implements MultiThreadedHop
}
else if(et == ExecType.SPARK)
{
- grp_agg = new GroupedAggregate(inputlops, getDataType(), getValueType(), et);
+ //physical operator selection
+ Hop groups = getInput().get(_paramIndexMap.get(Statement.GAGG_GROUPS));
+ boolean broadcastGroups = (_paramIndexMap.get(Statement.GAGG_WEIGHTS) == null &&
+ OptimizerUtils.checkSparkBroadcastMemoryBudget( groups.getDim1(), groups.getDim2(),
+ groups.getRowsInBlock(), groups.getColsInBlock(), groups.getNnz()) );
+
+ grp_agg = new GroupedAggregate(inputlops, getDataType(), getValueType(), et, broadcastGroups);
grp_agg.getOutputParameters().setDimensions(outputDim1, outputDim2, -1, -1, -1);
setRequiresReblock( true );
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b308c09b/src/main/java/org/apache/sysml/lops/GroupedAggregate.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/GroupedAggregate.java b/src/main/java/org/apache/sysml/lops/GroupedAggregate.java
index f54857d..d07cfe0 100644
--- a/src/main/java/org/apache/sysml/lops/GroupedAggregate.java
+++ b/src/main/java/org/apache/sysml/lops/GroupedAggregate.java
@@ -40,6 +40,11 @@ public class GroupedAggregate extends Lop
public static final String COMBINEDINPUT = "combinedinput";
private boolean _weights = false;
+
+ //spark-specific parameters
+ private boolean _broadcastGroups = false;
+
+ //cp-specific parameters
private int _numThreads = 1;
/**
@@ -65,6 +70,14 @@ public class GroupedAggregate extends Lop
public GroupedAggregate(
HashMap<String, Lop> inputParameterLops,
+ DataType dt, ValueType vt, ExecType et, boolean broadcastGroups) {
+ super(Lop.Type.GroupedAgg, dt, vt);
+ init(inputParameterLops, dt, vt, et);
+ _broadcastGroups = broadcastGroups;
+ }
+
+ public GroupedAggregate(
+ HashMap<String, Lop> inputParameterLops,
DataType dt, ValueType vt, ExecType et, int k) {
super(Lop.Type.GroupedAgg, dt, vt);
init(inputParameterLops, dt, vt, et);
@@ -203,6 +216,12 @@ public class GroupedAggregate extends Lop
sb.append( Lop.NAME_VALUE_SEPARATOR );
sb.append( _numThreads );
}
+ else if( getExecType()==ExecType.SPARK ) {
+ sb.append( OPERAND_DELIMITOR );
+ sb.append( "broadcast" );
+ sb.append( Lop.NAME_VALUE_SEPARATOR );
+ sb.append( _broadcastGroups );
+ }
sb.append( OPERAND_DELIMITOR );
sb.append( prepOutputOperand(output));
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b308c09b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 2ee6041..b46a3af 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -876,6 +876,22 @@ public class SparkExecutionContext extends ExecutionContext
parent.addLineageChild( child );
}
+ /**
+ *
+ * @param varParent
+ * @param varChild
+ * @param broadcast
+ * @throws DMLRuntimeException
+ */
+ public void addLineage(String varParent, String varChild, boolean broadcast)
+ throws DMLRuntimeException
+ {
+ if( broadcast )
+ addLineageBroadcast(varParent, varChild);
+ else
+ addLineageRDD(varParent, varChild);
+ }
+
@Override
public void cleanupMatrixObject( MatrixObject mo )
throws DMLRuntimeException
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b308c09b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
index 257e174..505e232 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
@@ -44,7 +44,8 @@ import org.apache.sysml.runtime.instructions.InstructionUtils;
import org.apache.sysml.runtime.instructions.cp.CPOperand;
import org.apache.sysml.runtime.instructions.mr.GroupedAggregateInstruction;
import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
-import org.apache.sysml.runtime.instructions.spark.functions.ExtractGroup;
+import org.apache.sysml.runtime.instructions.spark.functions.ExtractGroup.ExtractGroupBroadcast;
+import org.apache.sysml.runtime.instructions.spark.functions.ExtractGroup.ExtractGroupJoin;
import org.apache.sysml.runtime.instructions.spark.functions.ExtractGroupNWeights;
import org.apache.sysml.runtime.instructions.spark.functions.PerformGroupByAggInCombiner;
import org.apache.sysml.runtime.instructions.spark.functions.PerformGroupByAggInReducer;
@@ -58,6 +59,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixCell;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
import org.apache.sysml.runtime.matrix.data.WeightedCell;
import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
+import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
import org.apache.sysml.runtime.matrix.operators.CMOperator;
import org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTypes;
import org.apache.sysml.runtime.matrix.operators.Operator;
@@ -177,13 +179,16 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction
//opcode guaranteed to be a valid opcode (see parsing)
if ( opcode.equalsIgnoreCase("groupedagg") )
{
+ boolean broadcastGroups = Boolean.parseBoolean(params.get("broadcast"));
+
//get input rdd handle
+ String groupsVar = params.get(Statement.GAGG_GROUPS);
JavaPairRDD<MatrixIndexes,MatrixBlock> target = sec.getBinaryBlockRDDHandleForVariable( params.get(Statement.GAGG_TARGET) );
- JavaPairRDD<MatrixIndexes,MatrixBlock> groups = sec.getBinaryBlockRDDHandleForVariable( params.get(Statement.GAGG_GROUPS) );
+ JavaPairRDD<MatrixIndexes,MatrixBlock> groups = broadcastGroups ? null : sec.getBinaryBlockRDDHandleForVariable( groupsVar );
JavaPairRDD<MatrixIndexes,MatrixBlock> weights = null;
MatrixCharacteristics mc1 = sec.getMatrixCharacteristics( params.get(Statement.GAGG_TARGET) );
- MatrixCharacteristics mc2 = sec.getMatrixCharacteristics( params.get(Statement.GAGG_GROUPS) );
+ MatrixCharacteristics mc2 = sec.getMatrixCharacteristics( groupsVar );
if(mc1.dimsKnown() && mc2.dimsKnown() && (mc1.getRows() != mc2.getRows() || mc2.getCols() !=1)) {
throw new DMLRuntimeException("Grouped Aggregate dimension mismatch between target and groups.");
}
@@ -195,7 +200,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction
if ( params.get(Statement.GAGG_WEIGHTS) != null ) {
weights = sec.getBinaryBlockRDDHandleForVariable( params.get(Statement.GAGG_WEIGHTS) );
- MatrixCharacteristics mc3 = sec.getMatrixCharacteristics( params.get(Statement.GAGG_GROUPS) );
+ MatrixCharacteristics mc3 = sec.getMatrixCharacteristics( params.get(Statement.GAGG_WEIGHTS) );
if(mc1.dimsKnown() && mc3.dimsKnown() && (mc1.getRows() != mc3.getRows() || mc1.getCols() != mc3.getCols())) {
throw new DMLRuntimeException("Grouped Aggregate dimension mismatch between target, groups, and weights.");
}
@@ -205,19 +210,26 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction
}
else //input vector or matrix
{
- long ngroups = -1;
- if ( params.get(Statement.GAGG_NUM_GROUPS) != null) {
- ngroups = (long) Double.parseDouble(params.get(Statement.GAGG_NUM_GROUPS));
- }
+ String ngroupsStr = params.get(Statement.GAGG_NUM_GROUPS);
+ long ngroups = (ngroupsStr != null) ? (long) Double.parseDouble(ngroupsStr) : -1;
- //replicate groups if necessary
- if( mc1.getNumColBlocks() > 1 ) {
- groups = groups.flatMapToPair(
+ //execute basic grouped aggregate (extract and preagg)
+ if( broadcastGroups ) {
+ PartitionedBroadcastMatrix pbm = sec.getBroadcastForVariable(groupsVar);
+ groupWeightedCells = target
+ .flatMapToPair(new ExtractGroupBroadcast(pbm, mc1.getColsPerBlock(), ngroups, _optr));
+ }
+ else { //general case
+
+ //replicate groups if necessary
+ if( mc1.getNumColBlocks() > 1 ) {
+ groups = groups.flatMapToPair(
new ReplicateVectorFunction(false, mc1.getNumColBlocks() ));
+ }
+
+ groupWeightedCells = groups.join(target)
+ .flatMapToPair(new ExtractGroupJoin(mc1.getColsPerBlock(), ngroups, _optr));
}
-
- groupWeightedCells = groups.join(target)
- .flatMapToPair(new ExtractGroup(mc1.getColsPerBlock(), ngroups, _optr));
}
// Step 2: Make sure we have brlen required while creating <MatrixIndexes, MatrixCell>
@@ -228,7 +240,8 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction
// Step 3: Now perform grouped aggregate operation (either on combiner side or reducer side)
JavaPairRDD<MatrixIndexes, MatrixCell> out = null;
- if(_optr instanceof CMOperator && ((CMOperator) _optr).isPartialAggregateOperator() ) {
+ if(_optr instanceof CMOperator && ((CMOperator) _optr).isPartialAggregateOperator()
+ || _optr instanceof AggregateOperator ) {
out = groupWeightedCells.reduceByKey(new PerformGroupByAggInCombiner(_optr))
.mapValues(new CreateMatrixCell(brlen, _optr));
}
@@ -244,8 +257,8 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction
//store output rdd handle
sec.setRDDHandleForVariable(output.getName(), out);
- sec.addLineageRDD(output.getName(), params.get(Statement.GAGG_TARGET) );
- sec.addLineageRDD(output.getName(), params.get(Statement.GAGG_GROUPS) );
+ sec.addLineageRDD( output.getName(), params.get(Statement.GAGG_TARGET) );
+ sec.addLineage( output.getName(), groupsVar, broadcastGroups );
if ( params.get(Statement.GAGG_WEIGHTS) != null ) {
sec.addLineageRDD(output.getName(), params.get(Statement.GAGG_WEIGHTS) );
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b308c09b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
index 283e710..e8c6dbe 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
@@ -19,6 +19,7 @@
package org.apache.sysml.runtime.instructions.spark.functions;
+import java.io.Serializable;
import java.util.ArrayList;
import org.apache.spark.api.java.function.PairFlatMapFunction;
@@ -26,6 +27,7 @@ import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
import org.apache.sysml.runtime.matrix.data.WeightedCell;
@@ -33,13 +35,13 @@ import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
import org.apache.sysml.runtime.matrix.operators.Operator;
import org.apache.sysml.runtime.util.UtilFunctions;
-public class ExtractGroup implements PairFlatMapFunction<Tuple2<MatrixIndexes,Tuple2<MatrixBlock, MatrixBlock>>, MatrixIndexes, WeightedCell> {
-
+public abstract class ExtractGroup implements Serializable
+{
private static final long serialVersionUID = -7059358143841229966L;
- private long _bclen = -1;
- private long _ngroups = -1;
- private Operator _op = null;
+ protected long _bclen = -1;
+ protected long _ngroups = -1;
+ protected Operator _op = null;
public ExtractGroup( long bclen, long ngroups, Operator op ) {
_bclen = bclen;
@@ -47,15 +49,16 @@ public class ExtractGroup implements PairFlatMapFunction<Tuple2<MatrixIndexes,T
_op = op;
}
- @Override
- public Iterable<Tuple2<MatrixIndexes, WeightedCell>> call(
- Tuple2<MatrixIndexes, Tuple2<MatrixBlock, MatrixBlock>> arg)
- throws Exception
+ /**
+ *
+ * @param ix
+ * @param group
+ * @param target
+ * @return
+ * @throws Exception
+ */
+ protected Iterable<Tuple2<MatrixIndexes, WeightedCell>> execute(MatrixIndexes ix, MatrixBlock group, MatrixBlock target) throws Exception
{
- MatrixIndexes ix = arg._1;
- MatrixBlock group = arg._2._1;
- MatrixBlock target = arg._2._2;
-
//sanity check matching block dimensions
if(group.getNumRows() != target.getNumRows()) {
throw new Exception("The blocksize for group and target blocks are mismatched: " + group.getNumRows() + " != " + target.getNumRows());
@@ -102,6 +105,57 @@ public class ExtractGroup implements PairFlatMapFunction<Tuple2<MatrixIndexes,T
}
}
- return groupValuePairs;
+ return groupValuePairs;
+ }
+
+ /**
+ *
+ */
+ public static class ExtractGroupJoin extends ExtractGroup implements PairFlatMapFunction<Tuple2<MatrixIndexes,Tuple2<MatrixBlock, MatrixBlock>>, MatrixIndexes, WeightedCell>
+ {
+ private static final long serialVersionUID = 8890978615936560266L;
+
+ public ExtractGroupJoin(long bclen, long ngroups, Operator op) {
+ super(bclen, ngroups, op);
+ }
+
+ @Override
+ public Iterable<Tuple2<MatrixIndexes, WeightedCell>> call(
+ Tuple2<MatrixIndexes, Tuple2<MatrixBlock, MatrixBlock>> arg)
+ throws Exception
+ {
+ MatrixIndexes ix = arg._1;
+ MatrixBlock group = arg._2._1;
+ MatrixBlock target = arg._2._2;
+
+ return execute(ix, group, target);
+ }
+ }
+
+ /**
+ *
+ */
+ public static class ExtractGroupBroadcast extends ExtractGroup implements PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>, MatrixIndexes, WeightedCell>
+ {
+ private static final long serialVersionUID = 5709955602290131093L;
+
+ private PartitionedBroadcastMatrix _pbm = null;
+
+ public ExtractGroupBroadcast( PartitionedBroadcastMatrix pbm, long bclen, long ngroups, Operator op ) {
+ super(bclen, ngroups, op);
+ _pbm = pbm;
+ }
+
+ @Override
+ public Iterable<Tuple2<MatrixIndexes, WeightedCell>> call(
+ Tuple2<MatrixIndexes, MatrixBlock> arg)
+ throws Exception
+ {
+ MatrixIndexes ix = arg._1;
+ MatrixBlock group = _pbm.getMatrixBlock((int)ix.getRowIndex(), 1);
+ MatrixBlock target = arg._2;
+
+ return execute(ix, group, target);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b308c09b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/PerformGroupByAggInCombiner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/PerformGroupByAggInCombiner.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/PerformGroupByAggInCombiner.java
index 812ff15..8b0cbc8 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/PerformGroupByAggInCombiner.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/PerformGroupByAggInCombiner.java
@@ -35,31 +35,30 @@ public class PerformGroupByAggInCombiner implements Function2<WeightedCell, Weig
private static final long serialVersionUID = -813530414567786509L;
- Operator op;
+ private Operator _op;
+
public PerformGroupByAggInCombiner(Operator op) {
- this.op = op;
+ _op = op;
}
@Override
- public WeightedCell call(WeightedCell value1, WeightedCell value2) throws Exception {
- return doAggregation(op, value1, value2);
- }
-
- public WeightedCell doAggregation(Operator op, WeightedCell value1, WeightedCell value2) throws DMLRuntimeException {
+ public WeightedCell call(WeightedCell value1, WeightedCell value2)
+ throws Exception
+ {
WeightedCell outCell = new WeightedCell();
CM_COV_Object cmObj = new CM_COV_Object();
- if(op instanceof CMOperator) //everything except sum
+ if(_op instanceof CMOperator) //everything except sum
{
- if( ((CMOperator) op).isPartialAggregateOperator() )
+ if( ((CMOperator) _op).isPartialAggregateOperator() )
{
cmObj.reset();
- CM lcmFn = CM.getCMFnObject(((CMOperator) op).aggOpType); // cmFn.get(key.getTag());
+ CM lcmFn = CM.getCMFnObject(((CMOperator) _op).aggOpType); // cmFn.get(key.getTag());
//partial aggregate cm operator
lcmFn.execute(cmObj, value1.getValue(), value1.getWeight());
lcmFn.execute(cmObj, value2.getValue(), value2.getWeight());
- outCell.setValue(cmObj.getRequiredPartialResult(op));
+ outCell.setValue(cmObj.getRequiredPartialResult(_op));
outCell.setWeight(cmObj.getWeight());
}
else //forward tuples to reducer
@@ -67,9 +66,9 @@ public class PerformGroupByAggInCombiner implements Function2<WeightedCell, Weig
throw new DMLRuntimeException("Incorrect usage, should have used PerformGroupByAggInReducer");
}
}
- else if(op instanceof AggregateOperator) //sum
+ else if(_op instanceof AggregateOperator) //sum
{
- AggregateOperator aggop=(AggregateOperator) op;
+ AggregateOperator aggop=(AggregateOperator) _op;
if( aggop.correctionExists ) {
KahanObject buffer=new KahanObject(aggop.initialValue, 0);
@@ -96,7 +95,7 @@ public class PerformGroupByAggInCombiner implements Function2<WeightedCell, Weig
}
}
else
- throw new DMLRuntimeException("Unsupported operator in grouped aggregate instruction:" + op);
+ throw new DMLRuntimeException("Unsupported operator in grouped aggregate instruction:" + _op);
return outCell;
}
[2/3] incubator-systemml git commit: Local pre-aggregation for spark
grouped aggregate on sum w/o weights
Posted by mb...@apache.org.
Local pre-aggregation for spark grouped aggregate on sum w/o weights
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/4b716540
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/4b716540
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/4b716540
Branch: refs/heads/master
Commit: 4b7165409ad83842f224cefa0e29dbec93fb9d75
Parents: 2e5b724
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Thu Dec 31 13:25:11 2015 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Thu Dec 31 13:25:11 2015 -0800
----------------------------------------------------------------------
.../cp/ParameterizedBuiltinCPInstruction.java | 2 +-
.../ParameterizedBuiltinSPInstruction.java | 7 ++-
.../spark/functions/ExtractGroup.java | 57 +++++++++++++++-----
.../sysml/runtime/matrix/data/MatrixBlock.java | 5 +-
4 files changed, 54 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4b716540/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
index bd3153a..4dbc4a5 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
@@ -166,7 +166,7 @@ public class ParameterizedBuiltinCPInstruction extends ComputationCPInstruction
// compute the result
int k = Integer.parseInt(params.get("k")); //num threads
- MatrixBlock soresBlock = (MatrixBlock) (groups.groupedAggOperations(target, weights, new MatrixBlock(), ngroups, _optr, k));
+ MatrixBlock soresBlock = groups.groupedAggOperations(target, weights, new MatrixBlock(), ngroups, _optr, k);
ec.setMatrixOutput(output.getName(), soresBlock);
// release locks
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4b716540/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
index 75da3b9..257e174 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
@@ -205,6 +205,11 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction
}
else //input vector or matrix
{
+ long ngroups = -1;
+ if ( params.get(Statement.GAGG_NUM_GROUPS) != null) {
+ ngroups = (long) Double.parseDouble(params.get(Statement.GAGG_NUM_GROUPS));
+ }
+
//replicate groups if necessary
if( mc1.getNumColBlocks() > 1 ) {
groups = groups.flatMapToPair(
@@ -212,7 +217,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction
}
groupWeightedCells = groups.join(target)
- .flatMapToPair(new ExtractGroup(mc1.getColsPerBlock()));
+ .flatMapToPair(new ExtractGroup(mc1.getColsPerBlock(), ngroups, _optr));
}
// Step 2: Make sure we have brlen required while creating <MatrixIndexes, MatrixCell>
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4b716540/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
index 6259955..283e710 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
@@ -24,19 +24,27 @@ import java.util.ArrayList;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
+
+import org.apache.sysml.hops.OptimizerUtils;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
import org.apache.sysml.runtime.matrix.data.WeightedCell;
+import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
+import org.apache.sysml.runtime.matrix.operators.Operator;
import org.apache.sysml.runtime.util.UtilFunctions;
public class ExtractGroup implements PairFlatMapFunction<Tuple2<MatrixIndexes,Tuple2<MatrixBlock, MatrixBlock>>, MatrixIndexes, WeightedCell> {
private static final long serialVersionUID = -7059358143841229966L;
- private long _bclen = -1;
+ private long _bclen = -1;
+ private long _ngroups = -1;
+ private Operator _op = null;
- public ExtractGroup( long bclen ) {
+ public ExtractGroup( long bclen, long ngroups, Operator op ) {
_bclen = bclen;
+ _ngroups = ngroups;
+ _op = op;
}
@Override
@@ -56,19 +64,44 @@ public class ExtractGroup implements PairFlatMapFunction<Tuple2<MatrixIndexes,T
//output weighted cells
ArrayList<Tuple2<MatrixIndexes, WeightedCell>> groupValuePairs = new ArrayList<Tuple2<MatrixIndexes, WeightedCell>>();
long coloff = (ix.getColumnIndex()-1)*_bclen;
- for(int i = 0; i < group.getNumRows(); i++) {
- long groupVal = UtilFunctions.toLong(group.quickGetValue(i, 0));
- if(groupVal < 1) {
- throw new Exception("Expected group values to be greater than equal to 1 but found " + groupVal);
+
+ //local pre-aggregation for sum w/ known output dimensions
+ if(_op instanceof AggregateOperator && _ngroups > 0
+ && OptimizerUtils.isValidCPDimensions(_ngroups, target.getNumColumns()) )
+ {
+ MatrixBlock tmp = group.groupedAggOperations(target, null, new MatrixBlock(), (int)_ngroups, _op);
+
+ for(int i=0; i<tmp.getNumRows(); i++) {
+ for( int j=0; j<tmp.getNumColumns(); j++ ) {
+ double tmpval = tmp.quickGetValue(i, j);
+ if( tmpval != 0 ) {
+ WeightedCell weightedCell = new WeightedCell();
+ weightedCell.setValue(tmpval);
+ weightedCell.setWeight(1);
+ MatrixIndexes ixout = new MatrixIndexes(i+1,coloff+j+1);
+ groupValuePairs.add(new Tuple2<MatrixIndexes, WeightedCell>(ixout, weightedCell));
+ }
+ }
}
- for( int j=0; j<target.getNumColumns(); j++ ) {
- WeightedCell weightedCell = new WeightedCell();
- weightedCell.setValue(target.quickGetValue(i, j));
- weightedCell.setWeight(1);
- MatrixIndexes ixout = new MatrixIndexes(groupVal,coloff+j+1);
- groupValuePairs.add(new Tuple2<MatrixIndexes, WeightedCell>(ixout, weightedCell));
+ }
+ //general case without pre-aggregation
+ else
+ {
+ for(int i = 0; i < group.getNumRows(); i++) {
+ long groupVal = UtilFunctions.toLong(group.quickGetValue(i, 0));
+ if(groupVal < 1) {
+ throw new Exception("Expected group values to be greater than equal to 1 but found " + groupVal);
+ }
+ for( int j=0; j<target.getNumColumns(); j++ ) {
+ WeightedCell weightedCell = new WeightedCell();
+ weightedCell.setValue(target.quickGetValue(i, j));
+ weightedCell.setWeight(1);
+ MatrixIndexes ixout = new MatrixIndexes(groupVal,coloff+j+1);
+ groupValuePairs.add(new Tuple2<MatrixIndexes, WeightedCell>(ixout, weightedCell));
+ }
}
}
+
return groupValuePairs;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4b716540/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index c620908..afc4788 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -5285,7 +5285,7 @@ public class MatrixBlock extends MatrixValue implements Externalizable
* @throws DMLRuntimeException
* @throws DMLUnsupportedOperationException
*/
- public MatrixValue groupedAggOperations(MatrixValue tgt, MatrixValue wghts, MatrixValue ret, int ngroups, Operator op)
+ public MatrixBlock groupedAggOperations(MatrixValue tgt, MatrixValue wghts, MatrixValue ret, int ngroups, Operator op)
throws DMLRuntimeException, DMLUnsupportedOperationException
{
//single-threaded grouped aggregate
@@ -5304,11 +5304,10 @@ public class MatrixBlock extends MatrixValue implements Externalizable
* @throws DMLRuntimeException
* @throws DMLUnsupportedOperationException
*/
- public MatrixValue groupedAggOperations(MatrixValue tgt, MatrixValue wghts, MatrixValue ret, int ngroups, Operator op, int k)
+ public MatrixBlock groupedAggOperations(MatrixValue tgt, MatrixValue wghts, MatrixValue ret, int ngroups, Operator op, int k)
throws DMLRuntimeException, DMLUnsupportedOperationException
{
//setup input matrices
- // this <- groups
MatrixBlock target = checkType(tgt);
MatrixBlock weights = checkType(wghts);