You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/01/01 22:16:15 UTC

[2/3] incubator-systemml git commit: Local pre-aggregation for spark grouped aggregate on sum w/o weights

Local pre-aggregation for spark grouped aggregate on sum w/o weights

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/4b716540
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/4b716540
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/4b716540

Branch: refs/heads/master
Commit: 4b7165409ad83842f224cefa0e29dbec93fb9d75
Parents: 2e5b724
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Thu Dec 31 13:25:11 2015 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Thu Dec 31 13:25:11 2015 -0800

----------------------------------------------------------------------
 .../cp/ParameterizedBuiltinCPInstruction.java   |  2 +-
 .../ParameterizedBuiltinSPInstruction.java      |  7 ++-
 .../spark/functions/ExtractGroup.java           | 57 +++++++++++++++-----
 .../sysml/runtime/matrix/data/MatrixBlock.java  |  5 +-
 4 files changed, 54 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4b716540/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
index bd3153a..4dbc4a5 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
@@ -166,7 +166,7 @@ public class ParameterizedBuiltinCPInstruction extends ComputationCPInstruction
 			
 			// compute the result
 			int k = Integer.parseInt(params.get("k")); //num threads
-			MatrixBlock soresBlock = (MatrixBlock) (groups.groupedAggOperations(target, weights, new MatrixBlock(), ngroups, _optr, k));
+			MatrixBlock soresBlock = groups.groupedAggOperations(target, weights, new MatrixBlock(), ngroups, _optr, k);
 			
 			ec.setMatrixOutput(output.getName(), soresBlock);
 			// release locks

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4b716540/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
index 75da3b9..257e174 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
@@ -205,6 +205,11 @@ public class ParameterizedBuiltinSPInstruction  extends ComputationSPInstruction
 			}
 			else //input vector or matrix
 			{
+				long ngroups = -1;
+				if ( params.get(Statement.GAGG_NUM_GROUPS) != null) {
+					ngroups = (long) Double.parseDouble(params.get(Statement.GAGG_NUM_GROUPS));
+				}
+				
 				//replicate groups if necessary
 				if( mc1.getNumColBlocks() > 1 ) {
 					groups = groups.flatMapToPair(
@@ -212,7 +217,7 @@ public class ParameterizedBuiltinSPInstruction  extends ComputationSPInstruction
 				}
 				
 				groupWeightedCells = groups.join(target)
-						.flatMapToPair(new ExtractGroup(mc1.getColsPerBlock()));
+						.flatMapToPair(new ExtractGroup(mc1.getColsPerBlock(), ngroups, _optr));
 			}
 			
 			// Step 2: Make sure we have brlen required while creating <MatrixIndexes, MatrixCell> 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4b716540/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
index 6259955..283e710 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
@@ -24,19 +24,27 @@ import java.util.ArrayList;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 
 import scala.Tuple2;
+
+import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.WeightedCell;
+import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
+import org.apache.sysml.runtime.matrix.operators.Operator;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
 public class ExtractGroup  implements PairFlatMapFunction<Tuple2<MatrixIndexes,Tuple2<MatrixBlock, MatrixBlock>>, MatrixIndexes, WeightedCell> {
 
 	private static final long serialVersionUID = -7059358143841229966L;
 
-	private long _bclen = -1;
+	private long _bclen = -1; 
+	private long _ngroups = -1; 
+	private Operator _op = null;
 	
-	public ExtractGroup( long bclen ) {
+	public ExtractGroup( long bclen, long ngroups, Operator op ) {
 		_bclen = bclen;
+		_ngroups = ngroups;
+		_op = op;
 	}
 	
 	@Override
@@ -56,19 +64,44 @@ public class ExtractGroup  implements PairFlatMapFunction<Tuple2<MatrixIndexes,T
 		//output weighted cells
 		ArrayList<Tuple2<MatrixIndexes, WeightedCell>> groupValuePairs = new ArrayList<Tuple2<MatrixIndexes, WeightedCell>>();
 		long coloff = (ix.getColumnIndex()-1)*_bclen;
-		for(int i = 0; i < group.getNumRows(); i++) {
-			long groupVal = UtilFunctions.toLong(group.quickGetValue(i, 0));
-			if(groupVal < 1) {
-				throw new Exception("Expected group values to be greater than equal to 1 but found " + groupVal);
+		
+		//local pre-aggregation for sum w/ known output dimensions
+		if(_op instanceof AggregateOperator && _ngroups > 0 
+			&& OptimizerUtils.isValidCPDimensions(_ngroups, target.getNumColumns()) ) 
+		{
+			MatrixBlock tmp = group.groupedAggOperations(target, null, new MatrixBlock(), (int)_ngroups, _op);
+			
+			for(int i=0; i<tmp.getNumRows(); i++) {
+				for( int j=0; j<tmp.getNumColumns(); j++ ) {
+					double tmpval = tmp.quickGetValue(i, j);
+					if( tmpval != 0 ) {
+						WeightedCell weightedCell = new WeightedCell();
+						weightedCell.setValue(tmpval);
+						weightedCell.setWeight(1);
+						MatrixIndexes ixout = new MatrixIndexes(i+1,coloff+j+1);
+						groupValuePairs.add(new Tuple2<MatrixIndexes, WeightedCell>(ixout, weightedCell));
+					}
+				}
 			}
-			for( int j=0; j<target.getNumColumns(); j++ ) {
-				WeightedCell weightedCell = new WeightedCell();
-				weightedCell.setValue(target.quickGetValue(i, j));
-				weightedCell.setWeight(1);
-				MatrixIndexes ixout = new MatrixIndexes(groupVal,coloff+j+1);
-				groupValuePairs.add(new Tuple2<MatrixIndexes, WeightedCell>(ixout, weightedCell));
+		}
+		//general case without pre-aggregation
+		else 
+		{
+			for(int i = 0; i < group.getNumRows(); i++) {
+				long groupVal = UtilFunctions.toLong(group.quickGetValue(i, 0));
+				if(groupVal < 1) {
+					throw new Exception("Expected group values to be greater than equal to 1 but found " + groupVal);
+				}
+				for( int j=0; j<target.getNumColumns(); j++ ) {
+					WeightedCell weightedCell = new WeightedCell();
+					weightedCell.setValue(target.quickGetValue(i, j));
+					weightedCell.setWeight(1);
+					MatrixIndexes ixout = new MatrixIndexes(groupVal,coloff+j+1);
+					groupValuePairs.add(new Tuple2<MatrixIndexes, WeightedCell>(ixout, weightedCell));
+				}
 			}
 		}
+		
 		return groupValuePairs;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4b716540/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index c620908..afc4788 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -5285,7 +5285,7 @@ public class MatrixBlock extends MatrixValue implements Externalizable
 	 * @throws DMLRuntimeException
 	 * @throws DMLUnsupportedOperationException
 	 */
-	public MatrixValue groupedAggOperations(MatrixValue tgt, MatrixValue wghts, MatrixValue ret, int ngroups, Operator op) 
+	public MatrixBlock groupedAggOperations(MatrixValue tgt, MatrixValue wghts, MatrixValue ret, int ngroups, Operator op) 
 		throws DMLRuntimeException, DMLUnsupportedOperationException 
 	{
 		//single-threaded grouped aggregate 
@@ -5304,11 +5304,10 @@ public class MatrixBlock extends MatrixValue implements Externalizable
 	 * @throws DMLRuntimeException
 	 * @throws DMLUnsupportedOperationException
 	 */
-	public MatrixValue groupedAggOperations(MatrixValue tgt, MatrixValue wghts, MatrixValue ret, int ngroups, Operator op, int k) 
+	public MatrixBlock groupedAggOperations(MatrixValue tgt, MatrixValue wghts, MatrixValue ret, int ngroups, Operator op, int k) 
 		throws DMLRuntimeException, DMLUnsupportedOperationException 		
 	{
 		//setup input matrices
-		// this <- groups
 		MatrixBlock target = checkType(tgt);
 		MatrixBlock weights = checkType(wghts);