You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/02/17 22:35:35 UTC

incubator-systemml git commit: [SYSTEMML-1255] New cp/spark column-wise ternary aggregates, tests

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 184e02dac -> 21b96855b


[SYSTEMML-1255] New cp/spark column-wise ternary aggregates, tests 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/21b96855
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/21b96855
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/21b96855

Branch: refs/heads/master
Commit: 21b96855b4e6f36b379cc3ebc5bb95435158d35a
Parents: 184e02d
Author: Matthias Boehm <mb...@gmail.com>
Authored: Thu Feb 16 22:08:45 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Feb 17 13:55:02 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/sysml/hops/AggUnaryOp.java  |  18 +-
 .../org/apache/sysml/lops/TernaryAggregate.java |  24 +-
 .../runtime/compress/CompressedMatrixBlock.java |   5 +-
 .../instructions/CPInstructionParser.java       |   3 +-
 .../runtime/instructions/InstructionUtils.java  |  16 ++
 .../instructions/SPInstructionParser.java       |   2 +-
 .../cp/AggregateTernaryCPInstruction.java       |  32 ++-
 .../spark/AggregateTernarySPInstruction.java    | 128 +++++----
 .../spark/AggregateUnarySPInstruction.java      |  28 +-
 .../spark/ComputationSPInstruction.java         |  27 ++
 .../sysml/runtime/matrix/data/LibMatrixAgg.java | 191 ++++++++-----
 .../sysml/runtime/matrix/data/MatrixBlock.java  |  22 +-
 .../operators/AggregateTernaryOperator.java     |  57 ++++
 .../functions/ternary/TernaryAggregateTest.java | 267 +++++++++++++++++++
 .../functions/ternary/TernaryAggregateC.R       |  33 +++
 .../functions/ternary/TernaryAggregateC.dml     |  30 +++
 .../functions/ternary/TernaryAggregateRC.R      |  34 +++
 .../functions/ternary/TernaryAggregateRC.dml    |  31 +++
 .../functions/ternary/ZPackageSuite.java        |   3 +-
 19 files changed, 754 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/hops/AggUnaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/AggUnaryOp.java b/src/main/java/org/apache/sysml/hops/AggUnaryOp.java
index 9964981..fce424c 100644
--- a/src/main/java/org/apache/sysml/hops/AggUnaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/AggUnaryOp.java
@@ -119,7 +119,7 @@ public class AggUnaryOp extends Hop implements MultiThreadedHop
 			if ( et == ExecType.CP ) 
 			{
 				Lop agg1 = null;
-				if( isTernaryAggregateRewriteApplicable() ) {
+				if( isTernaryAggregateRewriteApplicable(et) ) {
 					agg1 = constructLopsTernaryAggregateRewrite(et);
 				}
 				else if( isUnaryAggregateOuterCPRewriteApplicable() )
@@ -245,7 +245,7 @@ public class AggUnaryOp extends Hop implements MultiThreadedHop
 				DirectionTypes dir = HopsDirection2Lops.get(_direction);
 
 				//unary aggregate
-				if( isTernaryAggregateRewriteApplicable() ) 
+				if( isTernaryAggregateRewriteApplicable(et) ) 
 				{
 					Lop aggregate = constructLopsTernaryAggregateRewrite(et);
 					setOutputDimensions(aggregate); //0x0 (scalar)
@@ -488,14 +488,15 @@ public class AggUnaryOp extends Hop implements MultiThreadedHop
 			return SparkAggType.MULTI_BLOCK;
 	}
 
-	private boolean isTernaryAggregateRewriteApplicable() throws HopsException 
+	private boolean isTernaryAggregateRewriteApplicable(ExecType et) 
+		throws HopsException 
 	{
 		boolean ret = false;
 		
 		//currently we support only sum over binary multiply but potentially 
 		//it can be generalized to any RC aggregate over two common binary operations
-		if( OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES &&
-			_direction == Direction.RowCol && _op == AggOp.SUM ) 
+		if( OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES && _op == AggOp.SUM &&
+			(_direction == Direction.RowCol || _direction == Direction.Col)  ) 
 		{
 			Hop input1 = getInput().get(0);
 			if( input1.getParent().size() == 1 && //sum single consumer
@@ -639,7 +640,6 @@ public class AggUnaryOp extends Hop implements MultiThreadedHop
 		Hop input11 = input1.getInput().get(0);
 		Hop input12 = input1.getInput().get(1);
 		
-		Lop ret = null;
 		Lop in1 = null;
 		Lop in2 = null;
 		Lop in3 = null;
@@ -668,10 +668,10 @@ public class AggUnaryOp extends Hop implements MultiThreadedHop
 		// The execution type of a unary aggregate instruction should depend on the execution type of inputs to avoid OOM
 		// Since we only support matrix-vector and not vector-matrix, checking the execution type of input1 should suffice.
 		ExecType et_input = input1.optFindExecType();
-		ret = new TernaryAggregate(in1, in2, in3, Aggregate.OperationTypes.KahanSum, 
-				Binary.OperationTypes.MULTIPLY, DataType.SCALAR, ValueType.DOUBLE, et_input, k);
+		DirectionTypes dir = HopsDirection2Lops.get(_direction);
 		
-		return ret;
+		return new TernaryAggregate(in1, in2, in3, Aggregate.OperationTypes.KahanSum, 
+				Binary.OperationTypes.MULTIPLY, dir, getDataType(), ValueType.DOUBLE, et_input, k);
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/lops/TernaryAggregate.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/TernaryAggregate.java b/src/main/java/org/apache/sysml/lops/TernaryAggregate.java
index 0932424..08600e4 100644
--- a/src/main/java/org/apache/sysml/lops/TernaryAggregate.java
+++ b/src/main/java/org/apache/sysml/lops/TernaryAggregate.java
@@ -21,22 +21,24 @@ package org.apache.sysml.lops;
 
 import org.apache.sysml.lops.LopProperties.ExecLocation;
 import org.apache.sysml.lops.LopProperties.ExecType;
+import org.apache.sysml.lops.PartialAggregate.DirectionTypes;
 import org.apache.sysml.lops.compile.JobType;
 import org.apache.sysml.parser.Expression.*;
 
 public class TernaryAggregate extends Lop 
 {
-	
-	private static final String OPCODE = "tak+*";
+	public static final String OPCODE_RC = "tak+*";
+	public static final String OPCODE_C = "tack+*";
 	
 	//NOTE: currently only used for ta+*
 	//private Aggregate.OperationTypes _aggOp = null;
 	//private Binary.OperationTypes _binOp = null;
+	private DirectionTypes _direction;
 	
 	//optional attribute for cp
 	private int _numThreads = -1;
 
-	public TernaryAggregate(Lop input1, Lop input2, Lop input3, Aggregate.OperationTypes aggOp, Binary.OperationTypes binOp, DataType dt, ValueType vt, ExecType et, int k ) 
+	public TernaryAggregate(Lop input1, Lop input2, Lop input3, Aggregate.OperationTypes aggOp, Binary.OperationTypes binOp, DirectionTypes direction, DataType dt, ValueType vt, ExecType et, int k ) 
 	{
 		super(Lop.Type.TernaryAggregate, dt, vt);
 		
@@ -50,6 +52,7 @@ public class TernaryAggregate extends Lop
 		input2.addOutput(this);
 		input3.addOutput(this);
 		
+		_direction = direction;
 		_numThreads = k;
 		
 		boolean breaksAlignment = false;
@@ -60,9 +63,8 @@ public class TernaryAggregate extends Lop
 	}
 	
 	@Override
-	public String toString()
-	{
-		return "Operation: "+OPCODE;		
+	public String toString() {
+		return "Operation: "+getOpCode();
 	}
 	
 	@Override
@@ -72,7 +74,7 @@ public class TernaryAggregate extends Lop
 		StringBuilder sb = new StringBuilder();
 		sb.append( getExecType() );
 		sb.append( OPERAND_DELIMITOR );
-		sb.append( OPCODE );
+		sb.append( getOpCode() );
 		sb.append( OPERAND_DELIMITOR );
 		sb.append( getInputs().get(0).prepInputOperand(input1));
 		sb.append( OPERAND_DELIMITOR );
@@ -89,4 +91,12 @@ public class TernaryAggregate extends Lop
 		
 		return sb.toString();
 	}
+	
+	private String getOpCode() {
+		switch( _direction ) {
+			case RowCol: return OPCODE_RC;
+			case Col: return OPCODE_C;
+			default: throw new RuntimeException("Unsupported aggregation direction: "+_direction);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index 84c4812..e345fca 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -77,6 +77,7 @@ import org.apache.sysml.runtime.matrix.data.SparseBlock;
 import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
 import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
 import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
+import org.apache.sysml.runtime.matrix.operators.AggregateTernaryOperator;
 import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysml.runtime.matrix.operators.BinaryOperator;
 import org.apache.sysml.runtime.matrix.operators.CMOperator;
@@ -1917,13 +1918,13 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 	}
 
 	@Override
-	public ScalarObject aggregateTernaryOperations(MatrixBlock m1, MatrixBlock m2, MatrixBlock m3, AggregateBinaryOperator op)
+	public MatrixBlock aggregateTernaryOperations(MatrixBlock m1, MatrixBlock m2, MatrixBlock m3, MatrixBlock ret, AggregateTernaryOperator op, boolean inCP)
 			throws DMLRuntimeException {
 		printDecompressWarning("aggregateTernaryOperations");
 		MatrixBlock left = isCompressed() ? decompress() : this;
 		MatrixBlock right1 = getUncompressed(m2);
 		MatrixBlock right2 = getUncompressed(m3);
-		return left.aggregateTernaryOperations(left, right1, right2, op);
+		return left.aggregateTernaryOperations(left, right1, right2, ret, op, inCP);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
index 9cf56d9..f3c1605 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
@@ -78,7 +78,8 @@ public class CPInstructionParser extends InstructionParser
 		String2CPInstructionType = new HashMap<String, CPINSTRUCTION_TYPE>();
 
 		String2CPInstructionType.put( "ba+*"   	, CPINSTRUCTION_TYPE.AggregateBinary);
-		String2CPInstructionType.put( "tak+*"   	, CPINSTRUCTION_TYPE.AggregateTernary);
+		String2CPInstructionType.put( "tak+*"   , CPINSTRUCTION_TYPE.AggregateTernary);
+		String2CPInstructionType.put( "tack+*"  , CPINSTRUCTION_TYPE.AggregateTernary);
 		
 		String2CPInstructionType.put( "uak+"   	, CPINSTRUCTION_TYPE.AggregateUnary);
 		String2CPInstructionType.put( "uark+"   , CPINSTRUCTION_TYPE.AggregateUnary);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
index 6ef39f1..2a990d4 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
@@ -48,6 +48,7 @@ import org.apache.sysml.runtime.functionobjects.Divide;
 import org.apache.sysml.runtime.functionobjects.Equals;
 import org.apache.sysml.runtime.functionobjects.GreaterThan;
 import org.apache.sysml.runtime.functionobjects.GreaterThanEquals;
+import org.apache.sysml.runtime.functionobjects.IndexFunction;
 import org.apache.sysml.runtime.functionobjects.IntegerDivide;
 import org.apache.sysml.runtime.functionobjects.KahanPlus;
 import org.apache.sysml.runtime.functionobjects.KahanPlusSq;
@@ -76,6 +77,7 @@ import org.apache.sysml.runtime.instructions.gpu.GPUInstruction.GPUINSTRUCTION_T
 import org.apache.sysml.runtime.instructions.mr.MRInstruction.MRINSTRUCTION_TYPE;
 import org.apache.sysml.runtime.instructions.spark.SPInstruction.SPINSTRUCTION_TYPE;
 import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
+import org.apache.sysml.runtime.matrix.operators.AggregateTernaryOperator;
 import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysml.runtime.matrix.operators.BinaryOperator;
 import org.apache.sysml.runtime.matrix.operators.LeftScalarOperator;
@@ -372,6 +374,20 @@ public class InstructionUtils
 		return aggun;
 	}
 
+	public static AggregateTernaryOperator parseAggregateTernaryOperator(String opcode) {
+		return parseAggregateTernaryOperator(opcode, 1);
+	}
+	
+	public static AggregateTernaryOperator parseAggregateTernaryOperator(String opcode, int numThreads) {
+		CorrectionLocationType corr = opcode.equalsIgnoreCase("tak+*") ? 
+				CorrectionLocationType.LASTCOLUMN : CorrectionLocationType.LASTROW;
+		AggregateOperator agg = new AggregateOperator(0, KahanPlus.getKahanPlusFnObject(), true, corr);
+		IndexFunction ixfun = opcode.equalsIgnoreCase("tak+*") ? 
+			ReduceAll.getReduceAllFnObject() : ReduceRow.getReduceRowFnObject();					
+		
+		return new AggregateTernaryOperator(Multiply.getMultiplyFnObject(), agg, ixfun, numThreads);
+	}
+	
 	public static AggregateOperator parseAggregateOperator(String opcode, String corrExists, String corrLoc)
 	{
 		AggregateOperator agg = null;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
index 9008f09..fa05de3 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
@@ -131,7 +131,7 @@ public class SPInstructionParser extends InstructionParser
 		
 		//ternary aggregate operators
 		String2SPInstructionType.put( "tak+*"      , SPINSTRUCTION_TYPE.AggregateTernary);
-
+		String2SPInstructionType.put( "tack+*"     , SPINSTRUCTION_TYPE.AggregateTernary);
 		
 		String2SPInstructionType.put( "rangeReIndex"   	, SPINSTRUCTION_TYPE.MatrixIndexing);
 		String2SPInstructionType.put( "leftIndex"   	, SPINSTRUCTION_TYPE.MatrixIndexing);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/instructions/cp/AggregateTernaryCPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/AggregateTernaryCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/AggregateTernaryCPInstruction.java
index b88c062..147436e 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/AggregateTernaryCPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/AggregateTernaryCPInstruction.java
@@ -21,18 +21,15 @@ package org.apache.sysml.runtime.instructions.cp;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysml.runtime.functionobjects.KahanPlus;
-import org.apache.sysml.runtime.functionobjects.Multiply;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
-import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
+import org.apache.sysml.runtime.matrix.operators.AggregateTernaryOperator;
 import org.apache.sysml.runtime.matrix.operators.Operator;
 
 public class AggregateTernaryCPInstruction extends ComputationCPInstruction
-{
-	public AggregateTernaryCPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand in3, 
-                                         CPOperand out, String opcode, String istr  )
+{	
+	public AggregateTernaryCPInstruction(Operator op, CPOperand in1, CPOperand in2, 
+		CPOperand in3, CPOperand out, String opcode, String istr  )
 	{
 		super(op, in1, in2, in3, out, opcode, istr);
 		_cptype = CPINSTRUCTION_TYPE.AggregateTernary;
@@ -44,7 +41,7 @@ public class AggregateTernaryCPInstruction extends ComputationCPInstruction
 		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
 		String opcode = parts[0];
 		
-		if ( opcode.equalsIgnoreCase("tak+*")) {
+		if ( opcode.equalsIgnoreCase("tak+*") || opcode.equalsIgnoreCase("tack+*") ) {
 			InstructionUtils.checkNumFields( parts, 5 );
 			
 			CPOperand in1 = new CPOperand(parts[1]);
@@ -52,16 +49,13 @@ public class AggregateTernaryCPInstruction extends ComputationCPInstruction
 			CPOperand in3 = new CPOperand(parts[3]);
 			CPOperand out = new CPOperand(parts[4]);
 			int numThreads = Integer.parseInt(parts[5]);
-				
-			AggregateOperator agg = new AggregateOperator(0, KahanPlus.getKahanPlusFnObject());
-			AggregateBinaryOperator op = new AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg, numThreads);
 			
+			AggregateTernaryOperator op = InstructionUtils.parseAggregateTernaryOperator(opcode, numThreads);
 			return new AggregateTernaryCPInstruction(op, in1, in2, in3, out, opcode, str);
 		} 
 		else {
-			throw new DMLRuntimeException("AggregateTertiaryInstruction.parseInstruction():: Unknown opcode " + opcode);
-		}
-		
+			throw new DMLRuntimeException("AggregateTernaryInstruction.parseInstruction():: Unknown opcode " + opcode);
+		}		
 	}
 	
 	@Override
@@ -73,14 +67,18 @@ public class AggregateTernaryCPInstruction extends ComputationCPInstruction
         MatrixBlock matBlock3 = input3.isLiteral() ? null : //matrix or literal 1
         						ec.getMatrixInput(input3.getName());
 			
-		AggregateBinaryOperator ab_op = (AggregateBinaryOperator) _optr;
-		ScalarObject ret = matBlock1.aggregateTernaryOperations(matBlock1, matBlock2, matBlock3, ab_op);
+		AggregateTernaryOperator ab_op = (AggregateTernaryOperator) _optr;
+		MatrixBlock ret = matBlock1.aggregateTernaryOperations(
+				matBlock1, matBlock2, matBlock3, new MatrixBlock(), ab_op, true);
 			
 		//release inputs/outputs
 		ec.releaseMatrixInput(input1.getName());
 		ec.releaseMatrixInput(input2.getName());
 		if( !input3.isLiteral() )
 			ec.releaseMatrixInput(input3.getName());
-		ec.setScalarOutput(output.getName(), ret);
+		if( output.getDataType().isScalar() )
+			ec.setScalarOutput(output.getName(), new DoubleObject(ret.quickGetValue(0, 0)));
+		else
+			ec.setMatrixOutput(output.getName(), ret);	
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java
index 4a7b130..2a305be 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java
@@ -20,32 +20,29 @@
 package org.apache.sysml.runtime.instructions.spark;
 
 import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
 
 import scala.Tuple2;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.KahanPlus;
-import org.apache.sysml.runtime.functionobjects.Multiply;
+import org.apache.sysml.runtime.functionobjects.ReduceAll;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.cp.DoubleObject;
-import org.apache.sysml.runtime.instructions.cp.ScalarObject;
+import org.apache.sysml.runtime.instructions.spark.functions.AggregateDropCorrectionFunction;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
-import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
-import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
+import org.apache.sysml.runtime.matrix.operators.AggregateTernaryOperator;
 import org.apache.sysml.runtime.matrix.operators.Operator;
 
 public class AggregateTernarySPInstruction extends ComputationSPInstruction
 {
-	
-	public AggregateTernarySPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand in3, 
-			                             CPOperand out, String opcode, String istr )
+	public AggregateTernarySPInstruction(Operator op, CPOperand in1, CPOperand in2, 
+		CPOperand in3, CPOperand out, String opcode, String istr )
 	{
 		super(op, in1, in2, in3, out, opcode, istr);
 		_sptype = SPINSTRUCTION_TYPE.AggregateTernary;
@@ -57,21 +54,19 @@ public class AggregateTernarySPInstruction extends ComputationSPInstruction
 		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
 		String opcode = parts[0];
 		
-		if ( opcode.equalsIgnoreCase("tak+*")) {
-			InstructionUtils.checkNumFields ( parts, 4 );
+		if ( opcode.equalsIgnoreCase("tak+*") || opcode.equalsIgnoreCase("tack+*") ) {
+			InstructionUtils.checkNumFields( parts, 4 );
 			
 			CPOperand in1 = new CPOperand(parts[1]);
 			CPOperand in2 = new CPOperand(parts[2]);
 			CPOperand in3 = new CPOperand(parts[3]);
 			CPOperand out = new CPOperand(parts[4]);
-
-			AggregateOperator agg = new AggregateOperator(0, KahanPlus.getKahanPlusFnObject());
-			AggregateBinaryOperator op = new AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg);
 			
+			AggregateTernaryOperator op = InstructionUtils.parseAggregateTernaryOperator(opcode);
 			return new AggregateTernarySPInstruction(op, in1, in2, in3, out, opcode, str);
 		} 
 		else {
-			throw new DMLRuntimeException("AggregateTertiaryInstruction.parseInstruction():: Unknown opcode " + opcode);
+			throw new DMLRuntimeException("AggregateTernaryInstruction.parseInstruction():: Unknown opcode " + opcode);
 		}
 	}
 	
@@ -81,88 +76,109 @@ public class AggregateTernarySPInstruction extends ComputationSPInstruction
 	{	
 		SparkExecutionContext sec = (SparkExecutionContext)ec;
 		
-		//get input
+		//get inputs
+		MatrixCharacteristics mcIn = sec.getMatrixCharacteristics( input1.getName() );
 		JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
 		JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = sec.getBinaryBlockRDDHandleForVariable( input2.getName() );
 		JavaPairRDD<MatrixIndexes,MatrixBlock> in3 = input3.isLiteral() ? null : //matrix or literal 1
 													 sec.getBinaryBlockRDDHandleForVariable( input3.getName() );
 		
 		//execute aggregate ternary operation
-		AggregateBinaryOperator aggop = (AggregateBinaryOperator) _optr;
-		JavaRDD<MatrixBlock> out = null;
+		AggregateTernaryOperator aggop = (AggregateTernaryOperator) _optr;
+		JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
 		if( in3 != null ) { //3 inputs
-			out = in1.join( in2 ).join( in3 ).values()
-				     .map(new RDDAggregateTernaryFunction(aggop));
+			out = in1.join( in2 ).join( in3 )
+				     .mapToPair(new RDDAggregateTernaryFunction(aggop));
 		}
 		else { //2 inputs (third is literal 1)
-			out = in1.join( in2 ).values()
-					 .map(new RDDAggregateTernaryFunction2(aggop));				
+			out = in1.join( in2 )
+					 .mapToPair(new RDDAggregateTernaryFunction2(aggop));				
+		}
+		
+		//aggregate partial results
+		if( aggop.indexFn instanceof ReduceAll ) //tak+*
+		{
+			//aggregate and create output (no lineage because scalar)	   
+			MatrixBlock tmp = RDDAggregateUtils.sumStable(out.values());
+			DoubleObject ret = new DoubleObject(tmp.getValue(0, 0));
+			sec.setVariable(output.getName(), ret);	
+		}
+		else if( mcIn.dimsKnown() && mcIn.getCols()<=mcIn.getColsPerBlock() ) //tack+* single block
+		{
+			//single block aggregation and drop correction
+			MatrixBlock ret = RDDAggregateUtils.aggStable(out, aggop.aggOp);
+			ret.dropLastRowsOrColums(aggop.aggOp.correctionLocation);
+			
+			//put output block into symbol table (no lineage because single block)
+			//this also includes implicit maintenance of matrix characteristics
+			sec.setMatrixOutput(output.getName(), ret);		
+		}
+		else //tack+* multi block
+		{
+			//multi-block aggregation and drop correction
+			out = RDDAggregateUtils.aggByKeyStable(out, aggop.aggOp);
+			out = out.mapValues( new AggregateDropCorrectionFunction(aggop.aggOp) );
+
+			//put output RDD handle into symbol table
+			updateUnaryAggOutputMatrixCharacteristics(sec, aggop.indexFn);
+			sec.setRDDHandleForVariable(output.getName(), out);	
+			sec.addLineageRDD(output.getName(), input1.getName());
+			sec.addLineageRDD(output.getName(), input2.getName());
+			if( in3 != null )
+				sec.addLineageRDD(output.getName(), input3.getName());
 		}
-				
-		//aggregate and create output (no lineage because scalar)	   
-		MatrixBlock tmp = RDDAggregateUtils.sumStable(out);
-		DoubleObject ret = new DoubleObject(tmp.getValue(0, 0));
-		sec.setVariable(output.getName(), ret);
 	}
 
 	private static class RDDAggregateTernaryFunction 
-		implements Function<Tuple2<Tuple2<MatrixBlock,MatrixBlock>,MatrixBlock>, MatrixBlock>
+		implements PairFunction<Tuple2<MatrixIndexes, Tuple2<Tuple2<MatrixBlock,MatrixBlock>,MatrixBlock>>, MatrixIndexes, MatrixBlock>
 	{
 		private static final long serialVersionUID = 6410232464410434210L;
 		
-		private AggregateBinaryOperator _aggop = null;
+		private final AggregateTernaryOperator _aggop;
 		
-		public RDDAggregateTernaryFunction( AggregateBinaryOperator aggop ) 
-		{
-			_aggop = aggop;		
+		public RDDAggregateTernaryFunction( AggregateTernaryOperator aggop ) {
+			_aggop = aggop;
 		}
 	
 		@Override
-		public MatrixBlock call(Tuple2<Tuple2<MatrixBlock, MatrixBlock>, MatrixBlock> arg0)
+		public Tuple2<MatrixIndexes,MatrixBlock> call(Tuple2<MatrixIndexes,Tuple2<Tuple2<MatrixBlock, MatrixBlock>, MatrixBlock>> arg0)
 			throws Exception 
 		{
 			//get inputs
-			MatrixBlock in1 = arg0._1()._1();
-			MatrixBlock in2 = arg0._1()._2();
-			MatrixBlock in3 = arg0._2();
+			MatrixIndexes ix = arg0._1();
+			MatrixBlock in1 = arg0._2()._1()._1();
+			MatrixBlock in2 = arg0._2()._1()._2();
+			MatrixBlock in3 = arg0._2()._2();
 			
 			//execute aggregate ternary operation
-			ScalarObject ret = in1.aggregateTernaryOperations(in1, in2, in3, _aggop);
-			
-			//create output matrix block (w/ correction)
-			MatrixBlock out = new MatrixBlock(2,1,false);
-			out.quickSetValue(0, 0, ret.getDoubleValue());
-			return out;
+			return new Tuple2<MatrixIndexes, MatrixBlock>(new MatrixIndexes(1, ix.getColumnIndex()),
+				in1.aggregateTernaryOperations(in1, in2, in3, new MatrixBlock(), _aggop, false));
 		}
 	}
 
 	private static class RDDAggregateTernaryFunction2 
-		implements Function<Tuple2<MatrixBlock,MatrixBlock>, MatrixBlock>
+		implements PairFunction<Tuple2<MatrixIndexes,Tuple2<MatrixBlock,MatrixBlock>>, MatrixIndexes, MatrixBlock>
 	{
 		private static final long serialVersionUID = -6615412819746331700L;
 		
-		private AggregateBinaryOperator _aggop = null;
+		private final AggregateTernaryOperator _aggop;
 		
-		public RDDAggregateTernaryFunction2( AggregateBinaryOperator aggop ) 
-		{
+		public RDDAggregateTernaryFunction2( AggregateTernaryOperator aggop ) {
 			_aggop = aggop;		
 		}
 	
 		@Override
-		public MatrixBlock call(Tuple2<MatrixBlock, MatrixBlock> arg0)
+		public Tuple2<MatrixIndexes,MatrixBlock> call(Tuple2<MatrixIndexes,Tuple2<MatrixBlock, MatrixBlock>> arg0)
 			throws Exception 
 		{
 			//get inputs
-			MatrixBlock in1 = arg0._1();
-			MatrixBlock in2 = arg0._2();
+			MatrixIndexes ix = arg0._1();
+			MatrixBlock in1 = arg0._2()._1();
+			MatrixBlock in2 = arg0._2()._2();
 			
 			//execute aggregate ternary operation
-			ScalarObject ret = in1.aggregateTernaryOperations(in1, in2, null, _aggop);
-			
-			//create output matrix block (w/ correction)
-			MatrixBlock out = new MatrixBlock(2,1,false);
-			out.quickSetValue(0, 0, ret.getDoubleValue());
-			return out;
+			return new Tuple2<MatrixIndexes,MatrixBlock>(new MatrixIndexes(1, ix.getColumnIndex()),
+				in1.aggregateTernaryOperations(in1, in2, null, new MatrixBlock(), _aggop, false));
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java
index 54ca925..eb9324f 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java
@@ -31,9 +31,6 @@ import org.apache.sysml.lops.PartialAggregate.CorrectionLocationType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.ReduceAll;
-import org.apache.sysml.runtime.functionobjects.ReduceCol;
-import org.apache.sysml.runtime.functionobjects.ReduceRow;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.functions.AggregateDropCorrectionFunction;
@@ -130,35 +127,12 @@ public class AggregateUnarySPInstruction extends UnarySPInstruction
 			}
 			
 			//put output RDD handle into symbol table
-			updateUnaryAggOutputMatrixCharacteristics(sec);
+			updateUnaryAggOutputMatrixCharacteristics(sec, auop.indexFn);
 			sec.setRDDHandleForVariable(output.getName(), out);	
 			sec.addLineageRDD(output.getName(), input1.getName());
 		}		
 	}
 
-	protected void updateUnaryAggOutputMatrixCharacteristics(SparkExecutionContext sec) 
-		throws DMLRuntimeException
-	{
-		AggregateUnaryOperator auop = (AggregateUnaryOperator)_optr;
-		
-		MatrixCharacteristics mc1 = sec.getMatrixCharacteristics(input1.getName());
-		MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName());
-		if(!mcOut.dimsKnown()) {
-			if(!mc1.dimsKnown()) {
-				throw new DMLRuntimeException("The output dimensions are not specified and cannot be inferred from input:" + mc1.toString() + " " + mcOut.toString());
-			}
-			else {
-				//infer statistics from input based on operator
-				if( auop.indexFn instanceof ReduceAll )
-					mcOut.set(1, 1, mc1.getRowsPerBlock(), mc1.getColsPerBlock());
-				else if (auop.indexFn instanceof ReduceCol)
-					mcOut.set(mc1.getRows(), 1, mc1.getRowsPerBlock(), mc1.getColsPerBlock());
-				else if (auop.indexFn instanceof ReduceRow)
-					mcOut.set(1, mc1.getCols(), mc1.getRowsPerBlock(), mc1.getColsPerBlock());
-			}
-		}
-	}
-
 	private static class RDDUAggFunction implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock> 
 	{
 		private static final long serialVersionUID = 2672082409287856038L;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/instructions/spark/ComputationSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ComputationSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ComputationSPInstruction.java
index 7a152a6..3d26b4c 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ComputationSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ComputationSPInstruction.java
@@ -21,6 +21,10 @@ package org.apache.sysml.runtime.instructions.spark;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.functionobjects.IndexFunction;
+import org.apache.sysml.runtime.functionobjects.ReduceAll;
+import org.apache.sysml.runtime.functionobjects.ReduceCol;
+import org.apache.sysml.runtime.functionobjects.ReduceRow;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.operators.Operator;
@@ -87,4 +91,27 @@ public abstract class ComputationSPInstruction extends SPInstruction {
 				sec.getMatrixCharacteristics(output.getName()).set(mcIn1.getRows(), mcIn1.getCols(), mcIn1.getRowsPerBlock(), mcIn1.getRowsPerBlock());
 		}
 	}
+	
+	protected void updateUnaryAggOutputMatrixCharacteristics(SparkExecutionContext sec, IndexFunction ixFn) 
+		throws DMLRuntimeException
+	{
+		MatrixCharacteristics mc1 = sec.getMatrixCharacteristics(input1.getName());
+		MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName());
+		if( mcOut.dimsKnown() )
+			return;
+		
+		if(!mc1.dimsKnown()) {
+			throw new DMLRuntimeException("The output dimensions are not specified and "
+				+ "cannot be inferred from input:" + mc1.toString() + " " + mcOut.toString());
+		}
+		else {
+			//infer statistics from input based on operator
+			if( ixFn instanceof ReduceAll )
+				mcOut.set(1, 1, mc1.getRowsPerBlock(), mc1.getColsPerBlock());
+			else if( ixFn instanceof ReduceCol )
+				mcOut.set(mc1.getRows(), 1, mc1.getRowsPerBlock(), mc1.getColsPerBlock());
+			else if( ixFn instanceof ReduceRow )
+				mcOut.set(1, mc1.getCols(), mc1.getRowsPerBlock(), mc1.getColsPerBlock());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
index 914480b..404f440 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
@@ -21,6 +21,7 @@ package org.apache.sysml.runtime.matrix.data;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -48,6 +49,7 @@ import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CM_COV_Object;
 import org.apache.sysml.runtime.instructions.cp.KahanObject;
 import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
+import org.apache.sysml.runtime.matrix.operators.AggregateTernaryOperator;
 import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysml.runtime.matrix.operators.CMOperator;
 import org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTypes;
@@ -388,64 +390,76 @@ public class LibMatrixAgg
 		return out;
 	}
 
-	public static double aggregateTernary(MatrixBlock in1, MatrixBlock in2, MatrixBlock in3)
+	public static MatrixBlock aggregateTernary(MatrixBlock in1, MatrixBlock in2, MatrixBlock in3, MatrixBlock ret, AggregateTernaryOperator op) 
+		throws DMLRuntimeException
 	{
 		//early abort if any block is empty
 		if( in1.isEmptyBlock(false) || in2.isEmptyBlock(false) || in3!=null&&in3.isEmptyBlock(false) ) {
-			return 0;
+			return ret;
 		}
-				
+		
 		//Timing time = new Timing(true);
 		
-		double val = -1;
+		//allocate output arrays (if required)
+		ret.reset(ret.rlen, ret.clen, false); //always dense
+		ret.allocateDenseBlock();
+		
+		IndexFunction ixFn = op.indexFn;
 		if( !in1.sparse && !in2.sparse && (in3==null||!in3.sparse) ) //DENSE
-			val = aggregateTernaryDense(in1, in2, in3, 0, in1.rlen);
+			aggregateTernaryDense(in1, in2, in3, ret, ixFn, 0, in1.rlen);
 		else //GENERAL CASE
-			val = aggregateTernaryGeneric(in1, in2, in3, 0, in1.rlen);
+			aggregateTernaryGeneric(in1, in2, in3, ret, ixFn, 0, in1.rlen);
 		
-		//System.out.println("tak+ ("+in1.rlen+","+in1.sparse+","+in2.sparse+","+in3.sparse+") in "+time.stop()+"ms.");
+		//cleanup output and change representation (if necessary)
+		ret.recomputeNonZeros();
+		ret.examSparsity();
 		
-		return val;			
+		//System.out.println("tak+ ("+in1.rlen+","+in1.sparse+","+in2.sparse+","+in3.sparse+") in "+time.stop()+"ms.");
+	
+		return ret;
 	}
 
-	public static double aggregateTernary(MatrixBlock in1, MatrixBlock in2, MatrixBlock in3, int k) 
+	public static MatrixBlock aggregateTernary(MatrixBlock in1, MatrixBlock in2, MatrixBlock in3, MatrixBlock ret, AggregateTernaryOperator op, int k) 
 		throws DMLRuntimeException
 	{		
 		//fall back to sequential version if necessary
-		if( k <= 1 || in1.rlen/3 < PAR_NUMCELL_THRESHOLD ) {
-			return aggregateTernary(in1, in2, in3);
+		if( k <= 1 || in1.nonZeros+in2.nonZeros < PAR_NUMCELL_THRESHOLD || in1.rlen <= k/2 
+			|| (!(op.indexFn instanceof ReduceCol) &&  ret.clen*8*k > PAR_INTERMEDIATE_SIZE_THRESHOLD) ) {
+			return aggregateTernary(in1, in2, in3, ret, op);
 		}
 		
 		//early abort if any block is empty
 		if( in1.isEmptyBlock(false) || in2.isEmptyBlock(false) || in3!=null&&in3.isEmptyBlock(false) ) {
-			return 0;
+			return ret;
 		}
 			
 		//Timing time = new Timing(true);
 		
-		double val = -1;
 		try {
 			ExecutorService pool = Executors.newFixedThreadPool( k );
 			ArrayList<AggTernaryTask> tasks = new ArrayList<AggTernaryTask>();
 			int blklen = (int)(Math.ceil((double)in1.rlen/k));
+			IndexFunction ixFn = op.indexFn;
 			for( int i=0; i<k & i*blklen<in1.rlen; i++ )
-				tasks.add( new AggTernaryTask(in1, in2, in3, i*blklen, Math.min((i+1)*blklen, in1.rlen)));
-			pool.invokeAll(tasks);	
+				tasks.add( new AggTernaryTask(in1, in2, in3, ret, ixFn, i*blklen, Math.min((i+1)*blklen, in1.rlen)));
+			List<Future<MatrixBlock>> rtasks = pool.invokeAll(tasks);	
 			pool.shutdown();
-			//aggregate partial results
-			KahanObject kbuff = new KahanObject(0, 0);
-			KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
-			for( AggTernaryTask task : tasks )
-				kplus.execute2(kbuff, task.getResult());
-			val = kbuff._sum;
+			//aggregate partial results and error handling
+			ret.copy(rtasks.get(0).get()); //for init
+			for( int i=1; i<rtasks.size(); i++ )
+				aggregateFinalResult(op.aggOp, ret, rtasks.get(i).get());
 		}
 		catch(Exception ex) {
 			throw new DMLRuntimeException(ex);
 		}
 		
-		//System.out.println("tak+ k="+k+" ("+in1.rlen+","+in1.sparse+","+in2.sparse+","+in3.sparse+") in "+time.stop()+"ms.");
+		//cleanup output and change representation (if necessary)
+		ret.recomputeNonZeros();
+		ret.examSparsity();
 		
-		return val;			
+		//System.out.println("tak+ k="+k+" ("+in1.rlen+","+in1.sparse+","+in2.sparse+","+in3.sparse+") in "+time.stop()+"ms.");	
+	
+		return ret;
 	}
 
 	public static void groupedAggregate(MatrixBlock groups, MatrixBlock target, MatrixBlock weights, MatrixBlock result, int numGroups, Operator op) 
@@ -642,49 +656,70 @@ public class LibMatrixAgg
 			out.binaryOperationsInPlace(laop.increOp, partout);
 	}
 
-	private static double aggregateTernaryDense(MatrixBlock in1, MatrixBlock in2, MatrixBlock in3, int rl, int ru)
+	private static void aggregateTernaryDense(MatrixBlock in1, MatrixBlock in2, MatrixBlock in3, MatrixBlock ret, IndexFunction ixFn, int rl, int ru)
 	{
 		//compute block operations
 		KahanObject kbuff = new KahanObject(0, 0);
 		KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
 		
 		double[] a = in1.denseBlock;
-		double[] b = in2.denseBlock;
+		double[] b1 = in2.denseBlock;
+		double[] b2 = (in3!=null) ? in3.denseBlock : null; //if null, literal 1
 		final int n = in1.clen;
 		
-		if( in3 != null ) //3 inputs
+		if( ixFn instanceof ReduceAll ) //tak+*
 		{
-			double[] c = in3.denseBlock;
-			
 			for( int i=rl, ix=rl*n; i<ru; i++ ) 
 				for( int j=0; j<n; j++, ix++ ) {
-					double val = a[ix] * b[ix] * c[ix];
+					double b2val = (b2 != null) ? b2[ix] : 1;
+					double val = a[ix] * b1[ix] * b2val;
 					kplus.execute2( kbuff, val );
 				}
+			ret.quickSetValue(0, 0, kbuff._sum);
+			ret.quickSetValue(0, 1, kbuff._correction);
 		}
-		else //2 inputs (third: literal 1)
+		else //tack+*
 		{
-			for( int i=rl, ix=rl*n; i<ru; i++ ) 
+			double[] c = ret.getDenseBlock();
+			for( int i=rl, ix=rl*n; i<ru; i++ )
 				for( int j=0; j<n; j++, ix++ ) {
-					double val = a[ix] * b[ix];
-					kplus.execute2( kbuff, val );
+					double b2val = (b2 != null) ? b2[ix] : 1;
+					double val = a[ix] * b1[ix] * b2val;
+					kbuff._sum = c[j];
+					kbuff._correction = c[j+n];
+					kplus.execute2(kbuff, val);
+					c[j] = kbuff._sum;
+					c[j+n] = kbuff._correction;
 				}
 		}
-		
-		return kbuff._sum;
 	}
 
-	private static double aggregateTernaryGeneric(MatrixBlock in1, MatrixBlock in2, MatrixBlock in3, int rl, int ru)
+	private static void aggregateTernaryGeneric(MatrixBlock in1, MatrixBlock in2, MatrixBlock in3, MatrixBlock ret, IndexFunction ixFn, int rl, int ru)
 	{		
 		//compute block operations
 		KahanObject kbuff = new KahanObject(0, 0);
 		KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
+		
+		//guaranteed to have at least one sparse input, sort by nnz, assume num cells if 
+		//(potentially incorrect) in dense representation, keep null at end via stable sort
+		MatrixBlock[] blocks = new MatrixBlock[]{in1, in2, in3};
+		Arrays.sort(blocks, new Comparator<MatrixBlock>() {
+			@Override
+			public int compare(MatrixBlock o1, MatrixBlock o2) {
+				long nnz1 = (o1!=null && o1.sparse) ? o1.nonZeros : Long.MAX_VALUE;
+				long nnz2 = (o2!=null && o2.sparse) ? o2.nonZeros : Long.MAX_VALUE;
+				return Long.compare(nnz1, nnz2);
+			}
+		});
+		MatrixBlock lin1 = blocks[0];
+		MatrixBlock lin2 = blocks[1];
+		MatrixBlock lin3 = blocks[2];
+		
+		SparseBlock a = lin1.sparseBlock;
 		final int n = in1.clen;
 		
-		if( in1.sparse )
+		if( ixFn instanceof ReduceAll ) //tak+*
 		{
-			SparseBlock a = in1.sparseBlock;
-			
 			for( int i=rl; i<ru; i++ )
 				if( !a.isEmpty(i) ) {
 					int apos = a.pos(i);
@@ -693,28 +728,40 @@ public class LibMatrixAgg
 					double[] avals = a.values(i);
 					for( int j=apos; j<apos+alen; j++ ) {
 						double val1 = avals[j];
-						double val2 = in2.quickGetValue(i, aix[j]);
+						double val2 = lin2.quickGetValue(i, aix[j]);
 						double val = val1 * val2;
-						if( val != 0 && in3 != null )
-							val *= in3.quickGetValue(i, aix[j]);
+						if( val != 0 && lin3 != null )
+							val *= lin3.quickGetValue(i, aix[j]);
 						kplus.execute2( kbuff, val );							
 					}
 				}	
+			ret.quickSetValue(0, 0, kbuff._sum);
+			ret.quickSetValue(0, 1, kbuff._correction);
 		}
-		else //generic case
+		else //tack+*
 		{
+			double[] c = ret.getDenseBlock();
 			for( int i=rl; i<ru; i++ )
-				for( int j=0; j<n; j++ ){
-					double val1 = in1.quickGetValue(i, j);
-					double val2 = in2.quickGetValue(i, j);
-					double val = val1 * val2;
-					if( in3 != null )
-						val *= in3.quickGetValue(i, j);
-					kplus.execute2( kbuff, val );		
-				}
+				if( !a.isEmpty(i) ) {
+					int apos = a.pos(i);
+					int alen = a.size(i);
+					int[] aix = a.indexes(i);
+					double[] avals = a.values(i);
+					for( int j=apos; j<apos+alen; j++ ) {
+						int colIx = aix[j];
+						double val1 = avals[j];
+						double val2 = lin2.quickGetValue(i, colIx);
+						double val = val1 * val2;
+						if( val != 0 && lin3 != null )
+							val *= lin3.quickGetValue(i, colIx);
+						kbuff._sum = c[colIx];
+						kbuff._correction = c[colIx+n];
+						kplus.execute2( kbuff, val );	
+						c[colIx] = kbuff._sum;
+						c[colIx+n] = kbuff._correction;	
+					}
+				}	
 		}
-	
-		return kbuff._sum;
 	}
 	
 
@@ -3492,37 +3539,43 @@ public class LibMatrixAgg
 		}		
 	}
 
-	private static class AggTernaryTask extends AggTask 
+	private static class AggTernaryTask implements Callable<MatrixBlock>
 	{
-		private MatrixBlock _in1  = null;
-		private MatrixBlock _in2  = null;
-		private MatrixBlock _in3  = null;
-		private double _ret = -1;
-		private int _rl = -1;
-		private int _ru = -1;
+		private final MatrixBlock _in1;
+		private final MatrixBlock _in2;
+		private final MatrixBlock _in3;
+		private MatrixBlock _ret = null;
+		private final IndexFunction _ixFn;
+		private final int _rl;
+		private final int _ru;
 
-		protected AggTernaryTask( MatrixBlock in1, MatrixBlock in2, MatrixBlock in3, int rl, int ru ) 
+		protected AggTernaryTask( MatrixBlock in1, MatrixBlock in2, MatrixBlock in3, MatrixBlock ret, IndexFunction ixFn, int rl, int ru ) 
 			throws DMLRuntimeException
 		{
 			_in1 = in1;	
 			_in2 = in2;	
-			_in3 = in3;				
+			_in3 = in3;		
+			_ret = ret;
+			_ixFn = ixFn;
 			_rl = rl;
 			_ru = ru;
 		}
 		
 		@Override
-		public Object call() throws DMLRuntimeException
+		public MatrixBlock call() throws DMLRuntimeException
 		{
+			//thead-local allocation for partial aggregation
+			_ret = new MatrixBlock(_ret.rlen, _ret.clen, false);
+			_ret.allocateDenseBlock();
+			
 			if( !_in1.sparse && !_in2.sparse && (_in3==null||!_in3.sparse) ) //DENSE
-				_ret = aggregateTernaryDense(_in1, _in2, _in3, _rl, _ru);
+				aggregateTernaryDense(_in1, _in2, _in3, _ret, _ixFn, _rl, _ru);
 			else //GENERAL CASE
-				_ret = aggregateTernaryGeneric(_in1, _in2, _in3, _rl, _ru);
+				aggregateTernaryGeneric(_in1, _in2, _in3, _ret, _ixFn, _rl, _ru);
+			
+			//recompute non-zeros of partial result
+			_ret.recomputeNonZeros();
 			
-			return null;
-		}
-		
-		public double getResult() {
 			return _ret;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index fb4f196..5fc69ce 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -59,7 +59,6 @@ import org.apache.sysml.runtime.functionobjects.RevIndex;
 import org.apache.sysml.runtime.functionobjects.SortIndex;
 import org.apache.sysml.runtime.functionobjects.SwapIndex;
 import org.apache.sysml.runtime.instructions.cp.CM_COV_Object;
-import org.apache.sysml.runtime.instructions.cp.DoubleObject;
 import org.apache.sysml.runtime.instructions.cp.KahanObject;
 import org.apache.sysml.runtime.instructions.cp.ScalarObject;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
@@ -68,6 +67,7 @@ import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
 import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
 import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
 import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
+import org.apache.sysml.runtime.matrix.operators.AggregateTernaryOperator;
 import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysml.runtime.matrix.operators.BinaryOperator;
 import org.apache.sysml.runtime.matrix.operators.CMOperator;
@@ -4886,7 +4886,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 		return ret;
 	}
 
-	public ScalarObject aggregateTernaryOperations(MatrixBlock m1, MatrixBlock m2, MatrixBlock m3, AggregateBinaryOperator op) 
+	public MatrixBlock aggregateTernaryOperations(MatrixBlock m1, MatrixBlock m2, MatrixBlock m3, MatrixBlock ret, AggregateTernaryOperator op, boolean inCP) 
 		throws DMLRuntimeException
 	{
 		//check input dimensions and operators
@@ -4895,15 +4895,23 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 		if( !( op.aggOp.increOp.fn instanceof KahanPlus && op.binaryFn instanceof Multiply) )
 			throw new DMLRuntimeException("Unsupported operator for aggregate tertiary operations.");
 		
+		//create output matrix block w/ corrections
+		int rl = (op.indexFn instanceof ReduceRow) ? 2 : 1;
+		int cl = (op.indexFn instanceof ReduceRow) ? m1.clen : 2;
+		if( ret == null )
+			ret = new MatrixBlock(rl, cl, false);
+		else
+			ret.reset(rl, cl, false);
+				
 		//execute ternary aggregate function
-		double val = -1;
 		if( op.getNumThreads() > 1 )
-			val = LibMatrixAgg.aggregateTernary(m1, m2, m3, op.getNumThreads());
+			ret = LibMatrixAgg.aggregateTernary(m1, m2, m3, ret, op, op.getNumThreads());
 		else
-			val = LibMatrixAgg.aggregateTernary(m1, m2, m3);
+			ret = LibMatrixAgg.aggregateTernary(m1, m2, m3, ret, op);
 		
-		//create output
-		return new DoubleObject(val);
+		if(op.aggOp.correctionExists && inCP)
+			ret.dropLastRowsOrColums(op.aggOp.correctionLocation);
+		return ret;
 	}
 
 	public MatrixBlock  uaggouterchainOperations(MatrixBlock mbLeft, MatrixBlock mbRight, MatrixBlock mbOut, BinaryOperator bOp, AggregateUnaryOperator uaggOp) 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/matrix/operators/AggregateTernaryOperator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/operators/AggregateTernaryOperator.java b/src/main/java/org/apache/sysml/runtime/matrix/operators/AggregateTernaryOperator.java
new file mode 100644
index 0000000..7ffdaff
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/matrix/operators/AggregateTernaryOperator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.sysml.runtime.matrix.operators;
+
+import java.io.Serializable;
+
+import org.apache.sysml.runtime.functionobjects.IndexFunction;
+import org.apache.sysml.runtime.functionobjects.ValueFunction;
+
+
+public class AggregateTernaryOperator extends Operator implements Serializable
+{
+	private static final long serialVersionUID = 4251745081160216784L;
+	
+	public ValueFunction binaryFn;
+	public AggregateOperator aggOp;
+	public IndexFunction indexFn;
+	private int k; //num threads
+	
+	public AggregateTernaryOperator(ValueFunction inner, AggregateOperator outer, IndexFunction ixfun) {
+		//default degree of parallelism is 1 (e.g., for distributed operations)
+		this( inner, outer, ixfun, 1 );
+	}
+	
+	public AggregateTernaryOperator(ValueFunction inner, AggregateOperator outer, IndexFunction ixfun, int numThreads)
+	{
+		binaryFn = inner;
+		aggOp = outer;
+		indexFn = ixfun;
+		k = numThreads;
+		
+		//so far we only support sum-product and its sparse-safe
+		sparseSafe = true;
+	}
+	
+	public int getNumThreads() {
+		return k;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/test/java/org/apache/sysml/test/integration/functions/ternary/TernaryAggregateTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/ternary/TernaryAggregateTest.java b/src/test/java/org/apache/sysml/test/integration/functions/ternary/TernaryAggregateTest.java
new file mode 100644
index 0000000..6168025
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/ternary/TernaryAggregateTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.ternary;
+
+import java.util.HashMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.lops.LopProperties.ExecType;
+import org.apache.sysml.runtime.instructions.Instruction;
+import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+import org.apache.sysml.utils.Statistics;
+
+/**
+ * 
+ */
+public class TernaryAggregateTest extends AutomatedTestBase 
+{
+	private final static String TEST_NAME1 = "TernaryAggregateRC";
+	private final static String TEST_NAME2 = "TernaryAggregateC";
+	
+	private final static String TEST_DIR = "functions/ternary/";
+	private final static String TEST_CLASS_DIR = TEST_DIR + TernaryAggregateTest.class.getSimpleName() + "/";
+	private final static double eps = 1e-8;
+	
+	private final static int rows = 1111;
+	private final static int cols = 1011;
+	
+	private final static double sparsity1 = 0.7;
+	private final static double sparsity2 = 0.3;
+	
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "R" }) ); 
+		addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] { "R" }) ); 
+	}
+
+	@Test
+	public void testTernaryAggregateRCDenseVectorCP() {
+		runTernaryAggregateTest(TEST_NAME1, false, true, true, ExecType.CP);
+	}
+	
+	@Test
+	public void testTernaryAggregateRCSparseVectorCP() {
+		runTernaryAggregateTest(TEST_NAME1, true, true, true, ExecType.CP);
+	}
+	
+	@Test
+	public void testTernaryAggregateRCDenseMatrixCP() {
+		runTernaryAggregateTest(TEST_NAME1, false, false, true, ExecType.CP);
+	}
+	
+	@Test
+	public void testTernaryAggregateRCSparseMatrixCP() {
+		runTernaryAggregateTest(TEST_NAME1, true, false, true, ExecType.CP);
+	}
+	
+	@Test
+	public void testTernaryAggregateRCDenseVectorSP() {
+		runTernaryAggregateTest(TEST_NAME1, false, true, true, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testTernaryAggregateRCSparseVectorSP() {
+		runTernaryAggregateTest(TEST_NAME1, true, true, true, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testTernaryAggregateRCDenseMatrixSP() {
+		runTernaryAggregateTest(TEST_NAME1, false, false, true, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testTernaryAggregateRCSparseMatrixSP() {
+		runTernaryAggregateTest(TEST_NAME1, true, false, true, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testTernaryAggregateRCDenseVectorMR() {
+		runTernaryAggregateTest(TEST_NAME1, false, true, true, ExecType.MR);
+	}
+	
+	@Test
+	public void testTernaryAggregateRCSparseVectorMR() {
+		runTernaryAggregateTest(TEST_NAME1, true, true, true, ExecType.MR);
+	}
+	
+	@Test
+	public void testTernaryAggregateRCDenseMatrixMR() {
+		runTernaryAggregateTest(TEST_NAME1, false, false, true, ExecType.MR);
+	}
+	
+	@Test
+	public void testTernaryAggregateRCSparseMatrixMR() {
+		runTernaryAggregateTest(TEST_NAME1, true, false, true, ExecType.MR);
+	}
+	
+	@Test
+	public void testTernaryAggregateCDenseVectorCP() {
+		runTernaryAggregateTest(TEST_NAME2, false, true, true, ExecType.CP);
+	}
+	
+	@Test
+	public void testTernaryAggregateCSparseVectorCP() {
+		runTernaryAggregateTest(TEST_NAME2, true, true, true, ExecType.CP);
+	}
+	
+	@Test
+	public void testTernaryAggregateCDenseMatrixCP() {
+		runTernaryAggregateTest(TEST_NAME2, false, false, true, ExecType.CP);
+	}
+	
+	@Test
+	public void testTernaryAggregateCSparseMatrixCP() {
+		runTernaryAggregateTest(TEST_NAME2, true, false, true, ExecType.CP);
+	}
+	
+	@Test
+	public void testTernaryAggregateCDenseVectorSP() {
+		runTernaryAggregateTest(TEST_NAME2, false, true, true, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testTernaryAggregateCSparseVectorSP() {
+		runTernaryAggregateTest(TEST_NAME2, true, true, true, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testTernaryAggregateCDenseMatrixSP() {
+		runTernaryAggregateTest(TEST_NAME2, false, false, true, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testTernaryAggregateCSparseMatrixSP() {
+		runTernaryAggregateTest(TEST_NAME2, true, false, true, ExecType.SPARK);
+	}
+	
+	//additional tests to check default without rewrites
+	
+	@Test
+	public void testTernaryAggregateRCDenseVectorCPNoRewrite() {
+		runTernaryAggregateTest(TEST_NAME2, false, true, false, ExecType.CP);
+	}
+	
+	@Test
+	public void testTernaryAggregateRCSparseVectorCPNoRewrite() {
+		runTernaryAggregateTest(TEST_NAME2, true, true, false, ExecType.CP);
+	}
+	
+	@Test
+	public void testTernaryAggregateRCDenseMatrixCPNoRewrite() {
+		runTernaryAggregateTest(TEST_NAME2, false, false, false, ExecType.CP);
+	}
+	
+	@Test
+	public void testTernaryAggregateRCSparseMatrixCPNoRewrite() {
+		runTernaryAggregateTest(TEST_NAME2, true, false, false, ExecType.CP);
+	}
+	
+	@Test
+	public void testTernaryAggregateCDenseVectorCPNoRewrite() {
+		runTernaryAggregateTest(TEST_NAME2, false, true, false, ExecType.CP);
+	}
+	
+	@Test
+	public void testTernaryAggregateCSparseVectorCPNoRewrite() {
+		runTernaryAggregateTest(TEST_NAME2, true, true, false, ExecType.CP);
+	}
+	
+	@Test
+	public void testTernaryAggregateCDenseMatrixCPNoRewrite() {
+		runTernaryAggregateTest(TEST_NAME2, false, false, false, ExecType.CP);
+	}
+	
+	@Test
+	public void testTernaryAggregateCSparseMatrixCPNoRewrite() {
+		runTernaryAggregateTest(TEST_NAME2, true, false, false, ExecType.CP);
+	}
+	
+	
+	
+	private void runTernaryAggregateTest(String testname, boolean sparse, boolean vectors, boolean rewrites, ExecType et)
+	{
+		//rtplatform for MR
+		RUNTIME_PLATFORM platformOld = rtplatform;
+		switch( et ){
+			case MR: rtplatform = RUNTIME_PLATFORM.HADOOP; break;
+			case SPARK: rtplatform = RUNTIME_PLATFORM.SPARK; break;
+			default: rtplatform = RUNTIME_PLATFORM.HYBRID; break;
+		}
+	
+		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		if( rtplatform == RUNTIME_PLATFORM.SPARK )
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+	
+		boolean rewritesOld = OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES;
+		
+		try
+		{
+			TestConfiguration config = getTestConfiguration(testname);
+			loadTestConfiguration(config);
+			
+			OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES = rewrites;
+			
+			String HOME = SCRIPT_DIR + TEST_DIR;
+			fullDMLScriptName = HOME + testname + ".dml";
+			programArgs = new String[]{"-stats","-args", input("A"), output("R")};
+			
+			fullRScriptName = HOME + testname + ".R";
+			rCmd = "Rscript" + " " + fullRScriptName + " " + 
+				inputDir() + " " + expectedDir();
+	
+			//generate actual dataset
+			double sparsity = sparse ? sparsity2 : sparsity1;
+			double[][] A = getRandomMatrix(vectors ? rows*cols : rows, 
+					vectors ? 1 : cols, 0, 1, sparsity, 17); 
+			writeInputMatrixWithMTD("A", A, true);
+			
+			//run test cases
+			runTest(true, false, null, -1); 
+			runRScript(true); 
+			
+			//compare output matrices 
+			HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R");
+			HashMap<CellIndex, Double> rfile  = readRMatrixFromFS("R");
+			TestUtils.compareMatrices(dmlfile, rfile, eps, "Stat-DML", "Stat-R");
+			
+			//check for rewritten patterns in statistics output
+			if( rewrites && et != ExecType.MR ) {
+				String opcode = ((et == ExecType.SPARK) ? Instruction.SP_INST_PREFIX : "") + 
+					(((testname.equals(TEST_NAME1) || vectors ) ? "tak+*" : "tack+*"));
+				Assert.assertEquals(new Boolean(true), new Boolean(
+					Statistics.getCPHeavyHitterOpCodes().contains(opcode)));
+			}
+		}
+		finally {
+			rtplatform = platformOld;
+			DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+			OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES = rewritesOld;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/test/scripts/functions/ternary/TernaryAggregateC.R
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/ternary/TernaryAggregateC.R b/src/test/scripts/functions/ternary/TernaryAggregateC.R
new file mode 100644
index 0000000..8e83ebf
--- /dev/null
+++ b/src/test/scripts/functions/ternary/TernaryAggregateC.R
@@ -0,0 +1,33 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+args <- commandArgs(TRUE)
+options(digits=22)
+
+library("Matrix")
+
+A = as.matrix(readMM(paste(args[1], "A.mtx", sep="")))
+B = A * 2;
+C = A * 3;
+
+R = t(as.matrix(colSums(A * B * C)));
+
+writeMM(as(R, "CsparseMatrix"), paste(args[2], "R", sep="")); 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/test/scripts/functions/ternary/TernaryAggregateC.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/ternary/TernaryAggregateC.dml b/src/test/scripts/functions/ternary/TernaryAggregateC.dml
new file mode 100644
index 0000000..a171ff4
--- /dev/null
+++ b/src/test/scripts/functions/ternary/TernaryAggregateC.dml
@@ -0,0 +1,30 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+A = read($1);
+B = A * 2;
+C = A * 3;
+
+if(1==1){}
+
+R = colSums(A * B * C);
+
+write(R, $2);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/test/scripts/functions/ternary/TernaryAggregateRC.R
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/ternary/TernaryAggregateRC.R b/src/test/scripts/functions/ternary/TernaryAggregateRC.R
new file mode 100644
index 0000000..96e793e
--- /dev/null
+++ b/src/test/scripts/functions/ternary/TernaryAggregateRC.R
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+args <- commandArgs(TRUE)
+options(digits=22)
+
+library("Matrix")
+
+A = as.matrix(readMM(paste(args[1], "A.mtx", sep="")))
+B = A * 2;
+C = A * 3;
+
+s = sum(A * B * C);
+R = as.matrix(s);
+
+writeMM(as(R, "CsparseMatrix"), paste(args[2], "R", sep="")); 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/test/scripts/functions/ternary/TernaryAggregateRC.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/ternary/TernaryAggregateRC.dml b/src/test/scripts/functions/ternary/TernaryAggregateRC.dml
new file mode 100644
index 0000000..485570d
--- /dev/null
+++ b/src/test/scripts/functions/ternary/TernaryAggregateRC.dml
@@ -0,0 +1,31 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+A = read($1);
+B = A * 2;
+C = A * 3;
+
+if(1==1){}
+
+s = sum(A * B * C);
+R = as.matrix(s);
+
+write(R, $2);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/test_suites/java/org/apache/sysml/test/integration/functions/ternary/ZPackageSuite.java
----------------------------------------------------------------------
diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/ternary/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/ternary/ZPackageSuite.java
index 83f217d..784177d 100644
--- a/src/test_suites/java/org/apache/sysml/test/integration/functions/ternary/ZPackageSuite.java
+++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/ternary/ZPackageSuite.java
@@ -31,7 +31,8 @@ import org.junit.runners.Suite;
 	CTableMatrixIgnoreZerosTest.class,
 	CTableSequenceTest.class,
 	QuantileWeightsTest.class,
-	TableOutputTest.class
+	TableOutputTest.class,
+	TernaryAggregateTest.class,
 })