You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/09/01 19:12:09 UTC

[1/3] systemml git commit: [SYSTEMML-1876] Fix size propagation QuaternaryOp wdivmm (stratstats)

Repository: systemml
Updated Branches:
  refs/heads/master 55b734227 -> 8dbc93022


[SYSTEMML-1876] Fix size propagation QuaternaryOp wdivmm (stratstats)

This patch fixes the worst-case size propagation (i.e., the propagation
of upper bounds instead of exact size information) for quaternary
operators of type wdivmm basic (cellwise). There was a branching issue
that led to output sizes for wdivmm basic being overwritten by output
sizes for wdivmm right. The issue did not show up before because it only
happens in the special case where the sizes (at least the dimensions) of
inputs cannot be inferred exactly.

Furthermore, this patch also includes some minor cleanups related to the
common code of MR lop construction.

Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/ec352151
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/ec352151
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/ec352151

Branch: refs/heads/master
Commit: ec352151d83ad93320928d86726057c900c3b9f8
Parents: 55b7342
Author: Matthias Boehm <mb...@gmail.com>
Authored: Thu Aug 31 18:39:08 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Sep 1 12:13:24 2017 -0700

----------------------------------------------------------------------
 .../org/apache/sysml/hops/QuaternaryOp.java     | 70 +++++++++-----------
 1 file changed, 33 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/ec352151/src/main/java/org/apache/sysml/hops/QuaternaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/QuaternaryOp.java b/src/main/java/org/apache/sysml/hops/QuaternaryOp.java
index bfbaae7..6395d6a 100644
--- a/src/main/java/org/apache/sysml/hops/QuaternaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/QuaternaryOp.java
@@ -59,7 +59,6 @@ import org.apache.sysml.runtime.matrix.mapred.DistributedCacheInput;
  */
 public class QuaternaryOp extends Hop implements MultiThreadedHop
 {
-
 	//config influencing mr operator selection (for testing purposes only) 
 	public static boolean FORCE_REPLICATION = false;
 	
@@ -321,7 +320,7 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop
 		setLops( wsloss );
 	}
 
-	private Lop obtainlU(Hop U, Hop V, boolean cacheU, double m1Size) throws HopsException, LopsException {
+	private Lop constructLeftFactorMRLop(Hop U, Hop V, boolean cacheU, double m1Size) throws HopsException, LopsException {
 		Lop lU = null;
 		if (cacheU) {
 			// partitioning of U for read through distributed cache
@@ -331,28 +330,26 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop
 				lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE,
 						(m1Size > OptimizerUtils.getLocalMemBudget()) ? ExecType.MR : ExecType.CP,
 						PDataPartitionFormat.ROW_BLOCK_WISE_N);
-				lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(),
-						U.getNnz());
+				lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz());
 				setLineNumbers(lU);
 			}
-		} else {
+		}
+		else {
 			// replication of U for shuffle to target block
 			Lop offset = createOffsetLop(V, false); // ncol of t(V) -> nrow of V determines num replicates
 			lU = new RepMat(U.constructLops(), offset, true, V.getDataType(), V.getValueType());
-			lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(),
-					U.getNnz());
+			lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(), U.getNnz());
 			setLineNumbers(lU);
-
+			
 			Group grpU = new Group(lU, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
-			grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(),
-					-1);
+			grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(), -1);
 			setLineNumbers(grpU);
 			lU = grpU;
 		}
 		return lU;
 	}
 
-	private Lop obtainlV(Hop U, Hop V, boolean cacheV, double m2Size) throws HopsException, LopsException {
+	private Lop constructRightFactorMRLop(Hop U, Hop V, boolean cacheV, double m2Size) throws HopsException, LopsException {
 		Lop lV = null;
 		if (cacheV) {
 			// partitioning of V for read through distributed cache
@@ -362,27 +359,24 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop
 				lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE,
 						(m2Size > OptimizerUtils.getLocalMemBudget()) ? ExecType.MR : ExecType.CP,
 						PDataPartitionFormat.ROW_BLOCK_WISE_N);
-				lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(),
-						V.getNnz());
+				lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz());
 				setLineNumbers(lV);
 			}
-		} else {
+		} 
+		else {
 			// replication of t(V) for shuffle to target block
 			Transform ltV = new Transform(V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(),
 					getValueType(), ExecType.MR);
-			ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(),
-					V.getNnz());
+			ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), V.getNnz());
 			setLineNumbers(ltV);
-
+			
 			Lop offset = createOffsetLop(U, false); // nrow of U determines num replicates
 			lV = new RepMat(ltV, offset, false, V.getDataType(), V.getValueType());
-			lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(),
-					V.getNnz());
+			lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), V.getNnz());
 			setLineNumbers(lV);
-
+			
 			Group grpV = new Group(lV, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
-			grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(),
-					-1);
+			grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), -1);
 			setLineNumbers(grpV);
 			lV = grpV;
 		}
@@ -464,8 +458,8 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop
 				setLineNumbers(grpW);
 			}
 
-			Lop lU = obtainlU(U, V, cacheU, m1Size);
-			Lop lV = obtainlV(U, V, cacheV, m2Size);
+			Lop lU = constructLeftFactorMRLop(U, V, cacheU, m1Size);
+			Lop lV = constructRightFactorMRLop(U, V, cacheV, m2Size);
 
 			//reduce-side wsloss w/ or without broadcast
 			Lop wsloss = new WeightedSquaredLossR( 
@@ -613,8 +607,8 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop
 			grpX.getOutputParameters().setDimensions(X.getDim1(), X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), X.getNnz());
 			setLineNumbers(grpX);
 
-			Lop lU = obtainlU(U, V, cacheU, m1Size);
-			Lop lV = obtainlV(U, V, cacheV, m2Size);
+			Lop lU = constructLeftFactorMRLop(U, V, cacheU, m1Size);
+			Lop lV = constructRightFactorMRLop(U, V, cacheV, m2Size);
 
 			//reduce-side wsig w/ or without broadcast
 			Lop wsigmoid = new WeightedSigmoidR( 
@@ -757,8 +751,8 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop
 			grpX.getOutputParameters().setDimensions(X.getDim1(), X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), X.getNnz());
 			setLineNumbers(grpX);
 
-			Lop lU = obtainlU(U, V, cacheU, m1Size);
-			Lop lV = obtainlV(U, V, cacheV, m2Size);
+			Lop lU = constructLeftFactorMRLop(U, V, cacheU, m1Size);
+			Lop lV = constructRightFactorMRLop(U, V, cacheV, m2Size);
 
 			//reduce-side wdivmm w/ or without broadcast
 			Lop wdivmm = new WeightedDivMMR( grpW, lU, lV, grpX, 
@@ -919,8 +913,8 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop
 			grpX.getOutputParameters().setDimensions(X.getDim1(), X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), -1);
 			setLineNumbers(grpX);
 
-			Lop lU = obtainlU(U, V, cacheU, m1Size);
-			Lop lV = obtainlV(U, V, cacheV, m2Size);
+			Lop lU = constructLeftFactorMRLop(U, V, cacheU, m1Size);
+			Lop lV = constructRightFactorMRLop(U, V, cacheV, m2Size);
 
 			//reduce-side wcemm w/ or without broadcast
 			Lop wcemm = new WeightedCrossEntropyR( grpX, lU, lV, eps.constructLops(),
@@ -1076,8 +1070,8 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop
 			grpX.getOutputParameters().setDimensions(X.getDim1(), X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), X.getNnz());
 			setLineNumbers(grpX);
 
-			Lop lU = obtainlU(U, V, cacheU, m1Size);
-			Lop lV = obtainlV(U, V, cacheV, m2Size);
+			Lop lU = constructLeftFactorMRLop(U, V, cacheU, m1Size);
+			Lop lV = constructRightFactorMRLop(U, V, cacheV, m2Size);
 
 			//reduce-side wumm w/ or without broadcast
 			Lop wumm = new WeightedUnaryMMR( 
@@ -1254,7 +1248,7 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop
 					MatrixCharacteristics mcW = memo.getAllInputStats(getInput().get(0));
 					ret = new long[]{mcW.getRows(), mcW.getCols(), mcW.getNonZeros()};	
 				}
-				if( _baseType == 1 || _baseType == 3 ) { //left (w/ transpose or w/ epsilon)
+				else if( _baseType == 1 || _baseType == 3 ) { //left (w/ transpose or w/ epsilon)
 					MatrixCharacteristics mcV = memo.getAllInputStats(getInput().get(2));
 					ret = new long[]{mcV.getRows(), mcV.getCols(), -1};
 				}
@@ -1329,24 +1323,26 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop
 					Hop inW = getInput().get(0);
 					setDim1( inW.getDim1() );
 					setDim2( inW.getDim2() );
-					setNnz( inW.getNnz() );	
+					setNnz( inW.getNnz() );
 				}
 				else if( _baseType == 1 || _baseType == 3 ){ //left (w/ transpose or w/ epsilon)
 					Hop inV = getInput().get(2);
 					setDim1( inV.getDim1() );
-					setDim2( inV.getDim2() );				
+					setDim2( inV.getDim2() );
+					setNnz( -1 ); //reset
 				}
 				else { //right
 					Hop inU = getInput().get(1);
 					setDim1( inU.getDim1() );
-					setDim2( inU.getDim2() );	
+					setDim2( inU.getDim2() );
+					setNnz( -1 ); //reset
 				}
 				break;
 			}
 			
 			default:
 				break;
-		}	
+		}
 	}
 	
 	@Override


[3/3] systemml git commit: [SYSTEMML-1888] Fix parfor optimizer exec type selection (msvm, kmeans)

Posted by mb...@apache.org.
[SYSTEMML-1888] Fix parfor optimizer exec type selection (msvm,kmeans)

This patch makes a number of smaller fixes to the parfor optimizer
execution type selection to enable the compilation of remote parfor
plans in the presence of many inner iterations but relatively small
input data. In detail, this includes (besides various cleanups): 

(1) Fixes for determining the number of inner iterations (include for
and while loops with defaults if necessary), 

(2) A modified minimum datasize threshold that is now scaled by the
number of inner iterations, and

(3) A better handling of what-if memory estimation (e.g., all operations
to CP) that does not account for unnecessary hops that are never
compiled to lops due to hop-lop rewrites.

On the perftest MSVM 1M x 1K, sparse scenario with 150 classes and 25
iterations, this patch improved performance from 288s to 94s on a 1+6
node cluster (including read and spark context creation).


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/8dbc9302
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/8dbc9302
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/8dbc9302

Branch: refs/heads/master
Commit: 8dbc93022a01aae309354c7b2b2f0eee9ec11aad
Parents: 9178a95
Author: Matthias Boehm <mb...@gmail.com>
Authored: Fri Sep 1 01:32:11 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Sep 1 12:13:27 2017 -0700

----------------------------------------------------------------------
 .../parfor/RemoteParForSparkWorker.java         |   4 +-
 .../controlprogram/parfor/opt/OptNode.java      | 328 +++++++------------
 .../parfor/opt/OptTreeConverter.java            |  25 +-
 .../parfor/opt/OptimizerRuleBased.java          |  15 +-
 4 files changed, 145 insertions(+), 227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/8dbc9302/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
index cd4a673..e1410da 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
@@ -84,7 +84,7 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 		ArrayList<String> tmp = RemoteParForUtils.exportResultVariables( _workerID, _ec.getVariables(), _resultVars );
 		for( String val : tmp )
 			ret.add(new Tuple2<Long,String>(_workerID, val));
-			
+		
 		return ret.iterator();
 	}
 
@@ -102,7 +102,7 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 		//parse and setup parfor body program
 		ParForBody body = ProgramConverter.parseParForBody(_prog, (int)_workerID);
 		_childBlocks = body.getChildBlocks();
-		_ec          = body.getEc();				
+		_ec          = body.getEc();
 		_resultVars  = body.getResultVarNames();
 		_numTasks    = 0;
 		_numIters    = 0;

http://git-wip-us.apache.org/repos/asf/systemml/blob/8dbc9302/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java
index 22126fe..193ce3e 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java
@@ -38,7 +38,6 @@ import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionForma
  */
 public class OptNode 
 {
-	
 	public enum NodeType{
 		GENERIC,
 		FUNCCALL,
@@ -47,7 +46,11 @@ public class OptNode
 		FOR,
 		PARFOR,
 		INST,
-		HOP
+		HOP;
+		public boolean isLoop() {
+			return this == WHILE ||
+				this == FOR || this == PARFOR;
+		}
 	}
 	
 	public enum ExecType { 
@@ -96,260 +99,203 @@ public class OptNode
 	private int                       _beginLine = -1;
 	private int                       _endLine = -1;
 	
-	public OptNode( NodeType type )
-	{
+	public OptNode( NodeType type ) {
 		this(type, null);
 	}
 
-	public OptNode( NodeType ntype, ExecType etype )
-	{
+	public OptNode( NodeType ntype, ExecType etype ) {
 		_ntype = ntype;
 		_etype = etype;
-		
 		_k = 1;
 	}
 	
 	///////
 	//getters and setters
 	
-	public NodeType getNodeType() 
-	{
+	public NodeType getNodeType() {
 		return _ntype;
 	}
 	
-	public void setNodeType(NodeType type) 
-	{
+	public void setNodeType(NodeType type) {
 		_ntype = type;
 	}
 	
-	public ExecType getExecType() 
-	{
+	public ExecType getExecType() {
 		return _etype;
 	}
 	
-	public void setExecType(ExecType type) 
-	{
+	public void setExecType(ExecType type) {
 		_etype = type;
 	}
 	
-	public void setID( long id )
-	{
+	public void setID( long id ) {
 		_id = id;
 	}
 	
-	public long getID( )
-	{
+	public long getID( ) {
 		return _id;
 	}
 	
-	public void addParam(ParamType ptype, String val)
-	{
+	public void addParam(ParamType ptype, String val) {
 		if( _params == null )
 			_params = new HashMap<ParamType, String>();
-		
 		_params.put(ptype, val);
 	}
 
-	public void setParams( HashMap<ParamType,String> params )
-	{
+	public void setParams( HashMap<ParamType,String> params ) {
 		_params = params;
 	}
 	
-	public String getParam( ParamType type )
-	{
-		String ret = null;
-		if( _params != null )
-			ret = _params.get(type);
-		return ret;
+	public String getParam( ParamType type ) {
+		return (_params != null) ?
+			_params.get(type) : null;
 	}
 	
-	public int getBeginLine()
-	{
+	public int getBeginLine() {
 		return _beginLine;
 	}
 	
-	public void setBeginLine( int line )
-	{
+	public void setBeginLine( int line ) {
 		_beginLine = line;
 	}
 	
-	public int getEndLine()
-	{
+	public int getEndLine() {
 		return _endLine;
 	}
 	
-	public void setEndLine( int line )
-	{
+	public void setEndLine( int line ) {
 		_endLine = line;
 	}
 
-	public void setLineNumbers( int begin, int end )
-	{
+	public void setLineNumbers( int begin, int end ) {
 		setBeginLine( begin );
 		setEndLine( end );
 	}
 	
-	public void addChild( OptNode child )
-	{
+	public void addChild( OptNode child ) {
 		if( _childs==null )
 			_childs = new ArrayList<OptNode>();
-		
 		_childs.add( child );
 	}
 	
-	public void addChilds( ArrayList<OptNode> childs )
-	{
+	public void addChilds( ArrayList<OptNode> childs ) {
 		if( _childs==null )
 			_childs = new ArrayList<OptNode>();
-		
-		_childs.addAll( childs );		
+		_childs.addAll( childs );
 	}
 	
-	public void setChilds(ArrayList<OptNode> childs) 
-	{
+	public void setChilds(ArrayList<OptNode> childs) {
 		_childs = childs;
 	}
 	
-	public ArrayList<OptNode> getChilds() 
-	{
+	public ArrayList<OptNode> getChilds() {
 		return _childs;
 	}
 	
-	
-	public int getK() 
-	{
+	public int getK() {
 		return _k;
 	}
 
-	public void setK(int k) 
-	{
+	public void setK(int k) {
 		_k = k;
 	}
 	
-	public OptNodeStatistics getStatistics()
-	{
+	public OptNodeStatistics getStatistics() {
 		return _stats;
 	}
 	
-	public void setStatistics(OptNodeStatistics stats)
-	{
+	public void setStatistics(OptNodeStatistics stats) {
 		_stats = stats;
 	}
 
-	public boolean exchangeChild(OptNode oldNode, OptNode newNode) 
-	{
-		boolean ret = false;
-		
-		if( _childs != null )
-			for( int i=0; i<_childs.size(); i++ )
-				if( _childs.get(i) == oldNode )
-				{
-					_childs.set(i, newNode);
-					ret = true;
-				}
+	public boolean exchangeChild(OptNode oldNode, OptNode newNode) {
+		if( isLeaf() )
+			return false;
 		
+		boolean ret = false;
+		for( int i=0; i<_childs.size(); i++ )
+			if( _childs.get(i) == oldNode ) {
+				_childs.set(i, newNode);
+				ret = true;
+			}
 		return ret;
 	}
 
-	public boolean isLeaf()
-	{
+	public boolean isLeaf() {
 		return ( _childs == null || _childs.isEmpty() );
 	}
 
-	public boolean hasOnlySimpleChilds()
-	{
-		boolean ret = true;
-		if( !isLeaf() )
-			for( OptNode n : _childs ) {
-				if( n.getNodeType()==NodeType.GENERIC )
-					ret &= n.hasOnlySimpleChilds();
-				//functions, loops, branches
-				else if( n.getNodeType()!=NodeType.HOP )
-					return false;
-			}
+	public boolean hasOnlySimpleChilds() {
+		if( isLeaf() )
+			return true;
 		
+		boolean ret = true;
+		for( OptNode n : _childs ) {
+			if( n.getNodeType()==NodeType.GENERIC )
+				ret &= n.hasOnlySimpleChilds();
+			//functions, loops, branches
+			else if( n.getNodeType()!=NodeType.HOP )
+				return false;
+		}
 		return ret;
 	}
 
-	public String getInstructionName() 
-	{
+	public String getInstructionName() {
 		return String.valueOf(_etype) + Lop.OPERAND_DELIMITOR + getParam(ParamType.OPSTRING);
 	}
 
-	public boolean isRecursive()
-	{
-		boolean ret = false;
+	public boolean isRecursive() {
 		String rec = getParam(ParamType.RECURSIVE_CALL);
-		if( rec != null )
-			ret = Boolean.parseBoolean(rec);
-		return ret;
+		return (rec != null) ?
+			Boolean.parseBoolean(rec) : false;
 	}
 	
 
 	///////
 	//recursive methods
 
-	public Collection<OptNode> getNodeList()
-	{
+	public Collection<OptNode> getNodeList() {
 		Collection<OptNode> nodes = new LinkedList<OptNode>();
-		
 		if(!isLeaf())
 			for( OptNode n : _childs )
 				nodes.addAll( n.getNodeList() );
 		nodes.add(this);
-		
 		return nodes;
 	}
 
-	public Collection<OptNode> getNodeList( ExecType et )
-	{
+	public Collection<OptNode> getNodeList( ExecType et ) {
 		Collection<OptNode> nodes = new LinkedList<OptNode>();
-		
 		if(!isLeaf())
 			for( OptNode n : _childs )
 				nodes.addAll( n.getNodeList( et ) );
-		
 		if( _etype == et )
 			nodes.add(this);
-		
 		return nodes;
 	}
 
-	public Collection<OptNode> getRelevantNodeList()
-	{
+	public Collection<OptNode> getRelevantNodeList() {
 		Collection<OptNode> nodes = new LinkedList<OptNode>();
-		
 		if( !isLeaf() )
-		{
 			for( OptNode n : _childs )
 				nodes.addAll( n.getRelevantNodeList() );
-		}
-		 
 		if( _ntype == NodeType.PARFOR || _ntype == NodeType.HOP )
-		{
 			nodes.add(this);
-		}
-		
 		return nodes;
 	}
 	
 	
-
-	
 	/**
 	 * Set the plan to a parallel degree of 1 (serial execution).
 	 */
-	public void setSerialParFor()
-	{
+	public void setSerialParFor() {
 		//process parfor nodes
-		if( _ntype == NodeType.PARFOR )
-		{
+		if( _ntype == NodeType.PARFOR ) {
 			_k = 1;
 			_etype = ExecType.CP;
 		}
 		
 		//process childs
-		if( _childs != null )
+		if( !isLeaf() )
 			for( OptNode n : _childs )
 				n.setSerialParFor();
 	}
@@ -359,10 +305,9 @@ public class OptNode
 	 * 
 	 * @return number of plan nodes
 	 */
-	public int size() 
-	{
+	public int size() {
 		int count = 1; //self
-		if( _childs != null )
+		if( !isLeaf() )
 			for( OptNode n : _childs )
 				count += n.size();
 		return count;
@@ -374,27 +319,23 @@ public class OptNode
 	 * 
 	 * @return true of all program blocks and instructions execute on CP
 	 */
-	public boolean isCPOnly()
-	{
-		boolean ret = (_etype == ExecType.CP);		
-		if( _childs != null )
-			for( OptNode n : _childs )
-			{
+	public boolean isCPOnly() {
+		boolean ret = (_etype == ExecType.CP);
+		if( !isLeaf() )
+			for( OptNode n : _childs ) {
 				if( !ret ) break; //early abort if already false
 				ret &= n.isCPOnly();
 			}
 		return ret;
 	}
 
-	public int getTotalK()
-	{
+	public int getTotalK() {
 		int k = 1;		
-		if( _childs != null )
+		if( !isLeaf() )
 			for( OptNode n : _childs )
 				k = Math.max(k, n.getTotalK() );
 		
-		if( _ntype == NodeType.PARFOR )
-		{
+		if( _ntype == NodeType.PARFOR ) {
 			if( _etype==ExecType.CP )
 				k = _k * k;
 			else //MR
@@ -404,104 +345,80 @@ public class OptNode
 		return k;
 	}
 
-	public long getMaxC( long N )
-	{
+	public long getMaxC( long N ) {
 		long maxc = N;
-		if( _childs != null )
+		if( !isLeaf() )
 			for( OptNode n : _childs )
-				maxc = Math.min(maxc, n.getMaxC( N ) );
+				maxc = Math.min(maxc, n.getMaxC( N ));
 		
-		if( _ntype == NodeType.HOP )
-		{
-			String ts = getParam( ParamType.TASK_SIZE );
+		if( _ntype == NodeType.HOP ) {
+			String ts = getParam(ParamType.TASK_SIZE);
 			if( ts != null )
-				maxc = Math.min(maxc, Integer.parseInt(ts) );
+				maxc = Math.min(maxc, Integer.parseInt(ts));
 		}
 		
-		if(    _ntype == NodeType.PARFOR 
-		    && _etype == ExecType.CP    )
-		{
+		if( _ntype == NodeType.PARFOR && _etype == ExecType.CP)
 			maxc = maxc / _k; //intdiv
-		}
 		
 		return maxc;
 	}
 
-	public boolean hasNestedParallelism( boolean flagNested )
-	{
+	public boolean hasNestedParallelism( boolean flagNested ) {
 		boolean ret = false;
-		
-		if( _ntype == NodeType.PARFOR )
-		{
+		//check for parfor in nested context
+		if( _ntype == NodeType.PARFOR ) {
 			if( flagNested ) 
 				return true;
 			flagNested = true;
 		}
 		
-		if( _childs != null )
-			for( OptNode n : _childs )
-			{
-				if( ret ) break; //early abort if already true
-				ret |= n.hasNestedParallelism( flagNested );
-			}
-		
-			ret = true;
-			
+		//recursively process children
+		if( !isLeaf() )
+			for( int i=0; i<_childs.size() && !ret; i++ )
+				ret |= _childs.get(i).hasNestedParallelism( flagNested );
 		return ret;
 	}
 
-	public boolean hasNestedPartitionReads( boolean flagNested )
-	{
-		boolean ret = false;
-		if( isLeaf() )
-		{
+	public boolean hasNestedPartitionReads( boolean flagNested ) {
+		if( isLeaf() ) {
 			//partitioned read identified by selected partition format
 			String tmp = getParam(ParamType.DATA_PARTITION_FORMAT);
-			ret = ( tmp !=null 
-					&& PartitionFormat.valueOf(tmp)._dpf!=PDataPartitionFormat.NONE 
-					&& flagNested );
-		}
-		else
-		{
-			for( OptNode n : _childs )
-			{
-				if( n._ntype == NodeType.PARFOR || n._ntype == NodeType.FOR || n._ntype == NodeType.WHILE )
-					flagNested = true;
-				
-				ret |= n.hasNestedPartitionReads( flagNested );
-				if( ret ) break; //early abort if already true
-			}
+			return ( tmp !=null && flagNested
+				&& PartitionFormat.valueOf(tmp)._dpf!=PDataPartitionFormat.NONE);
 		}
 		
+		boolean ret = false;
+		for( int i=0; i<_childs.size() && !ret; i++ ) {
+			OptNode n = _childs.get(i);
+			if( n._ntype.isLoop() )
+				flagNested = true;
+			ret |= n.hasNestedPartitionReads( flagNested );
+		}
 		return ret;
 	}
 
-	public void checkAndCleanupLeafNodes() 
-	{
-		if( _childs != null )
-			for( int i=0; i<_childs.size(); i++ )
-			{
-				OptNode n = _childs.get(i);
-				n.checkAndCleanupLeafNodes();
-				if( n.isLeaf() && n._ntype != NodeType.HOP && n._ntype != NodeType.INST 
-					&& n._ntype != NodeType.FUNCCALL ) // && n._ntype != NodeType.PARFOR
-				{
-					_childs.remove(i);
-					i--;
-				}
+	public void checkAndCleanupLeafNodes() {
+		if( isLeaf() )
+			return;
+		for( int i=0; i<_childs.size(); i++ ) {
+			OptNode n = _childs.get(i);
+			n.checkAndCleanupLeafNodes();
+			if( n.isLeaf() && n._ntype != NodeType.HOP && n._ntype != NodeType.INST 
+				&& n._ntype != NodeType.FUNCCALL ) {
+				_childs.remove(i);
+				i--;
 			}
+		}
 	}
 
-	public void checkAndCleanupRecursiveFunc(Set<String> stack) 
-	{
+	public void checkAndCleanupRecursiveFunc(Set<String> stack) {
 		//recursive invocation
 		if( !isLeaf() )
 			for( OptNode n : _childs )
 				n.checkAndCleanupRecursiveFunc( stack );
 	
 		//collect and update func info
-		if(_ntype == NodeType.FUNCCALL)
-		{
+		if(_ntype == NodeType.FUNCCALL) {
 			String rec = getParam(ParamType.RECURSIVE_CALL);
 			String fname = getParam(ParamType.OPSTRING);
 			if( rec != null && Boolean.parseBoolean(rec) ) 
@@ -573,23 +490,24 @@ public class OptNode
 	}
 
 	/**
-	 * Determines the maximum problem size of all children.
+	 * Determines the maximum problem size, in terms of the maximum
+	 * total number of inner loop iterations, of the entire subtree.
 	 * 
 	 * @return maximum problem size
 	 */
-	public long getMaxProblemSize() 
-	{
-		long max = 0;
-		if( _childs != null )
+	public long getMaxProblemSize() {
+		//recursively process children
+		long max = 1;
+		if( !isLeaf() )
 			for( OptNode n : _childs )
-				max = Math.max(max, n.getMaxProblemSize());		
-		else
-			max = 1;
+				max = Math.max(max, n.getMaxProblemSize());
 		
-		if( _ntype == NodeType.PARFOR )
-			max = max * Long.parseLong(_params.get(ParamType.NUM_ITERATIONS));
-
+		//scale problem size by number of loop iterations
+		if( _ntype.isLoop() && !isLeaf() ) {
+			String numIter = getParam(ParamType.NUM_ITERATIONS);
+			max *= (numIter != null) ? Long.parseLong(numIter) :
+				CostEstimator.FACTOR_NUM_ITERATIONS;
+		}
 		return max;
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/8dbc9302/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java
index d8cad78..1aae331 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java
@@ -358,12 +358,11 @@ public class OptTreeConverter
 			
 			//process body
 			int len = ws.getBody().size();
-			for( int i=0; i<wpb.getChildBlocks().size() && i<len; i++ )
-			{
+			for( int i=0; i<wpb.getChildBlocks().size() && i<len; i++ ) {
 				ProgramBlock lpb = wpb.getChildBlocks().get(i);
 				StatementBlock lsb = ws.getBody().get(i);
 				node.addChild( rCreateAbstractOptNode(lsb,lpb,vars,false, memo) );
-			}			
+			}
 		}
 		else if( pb instanceof ForProgramBlock && sb instanceof ForStatementBlock && !(pb instanceof ParForProgramBlock) )
 		{
@@ -392,12 +391,11 @@ public class OptTreeConverter
 			
 			//process body
 			int len = fs.getBody().size();
-			for( int i=0; i<fpb.getChildBlocks().size() && i<len; i++ )
-			{
+			for( int i=0; i<fpb.getChildBlocks().size() && i<len; i++ ) {
 				ProgramBlock lpb = fpb.getChildBlocks().get(i);
 				StatementBlock lsb = fs.getBody().get(i);
 				node.addChild( rCreateAbstractOptNode(lsb,lpb,vars,false, memo) );
-			}	
+			}
 		}
 		else if( pb instanceof ParForProgramBlock && sb instanceof ParForStatementBlock )
 		{
@@ -409,11 +407,10 @@ public class OptTreeConverter
 			_hlMap.putProgMapping(sb, pb, node);
 			node.setK( fpb.getDegreeOfParallelism() );
 			long N = fpb.getNumIterations();
-			node.addParam(ParamType.NUM_ITERATIONS, (N!=-1) ? String.valueOf(N) : 
-															  String.valueOf(CostEstimator.FACTOR_NUM_ITERATIONS));
-
-			switch(fpb.getExecMode())
-			{
+			node.addParam(ParamType.NUM_ITERATIONS, (N!=-1) ? String.valueOf(N) :
+				String.valueOf(CostEstimator.FACTOR_NUM_ITERATIONS));
+			
+			switch(fpb.getExecMode()) {
 				case LOCAL:
 					node.setExecType(ExecType.CP);
 					break;
@@ -429,8 +426,7 @@ public class OptTreeConverter
 					node.setExecType(null);
 			}
 			
-			if( !topLevel )
-			{
+			if( !topLevel ) {
 				fsb.getFromHops().resetVisitStatus();
 				fsb.getToHops().resetVisitStatus();
 				if( fsb.getIncrementHops()!=null )
@@ -443,8 +439,7 @@ public class OptTreeConverter
 			
 			//process body
 			int len = fs.getBody().size();
-			for( int i=0; i<fpb.getChildBlocks().size() && i<len; i++ )
-			{
+			for( int i=0; i<fpb.getChildBlocks().size() && i<len; i++ ) {
 				ProgramBlock lpb = fpb.getChildBlocks().get(i);
 				StatementBlock lsb = fs.getBody().get(i);
 				node.addChild( rCreateAbstractOptNode(lsb,lpb,vars,false, memo) );

http://git-wip-us.apache.org/repos/asf/systemml/blob/8dbc9302/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index 9456703..b8da25a 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -255,7 +255,8 @@ public class OptimizerRuleBased extends Optimizer
 		LOG.debug(getOptMode()+" OPT: estimated new mem (serial exec) M="+toMB(M1) );
 		
 		//determine memory consumption for what-if: all-cp or partitioned 
-		double M2 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, LopProperties.ExecType.CP);
+		double M2 = pn.isCPOnly() ? M1 :
+			_cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, LopProperties.ExecType.CP);
 		LOG.debug(getOptMode()+" OPT: estimated new mem (serial exec, all CP) M="+toMB(M2) );
 		double M3 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, true);
 		LOG.debug(getOptMode()+" OPT: estimated new mem (cond partitioning) M="+toMB(M3) );
@@ -898,17 +899,21 @@ public class OptimizerRuleBased extends Optimizer
 		return requiresRecompile;
 	}
 
-	protected boolean isLargeProblem(OptNode pn, double M0)
+	protected boolean isLargeProblem(OptNode pn, double M)
 	{
-		return ((_N >= PROB_SIZE_THRESHOLD_REMOTE || _Nmax >= 10 * PROB_SIZE_THRESHOLD_REMOTE )
-				&& M0 > PROB_SIZE_THRESHOLD_MB ); //original operations at least larger than 256MB
+		//TODO get a proper time estimate based to capture compute-intensive scenarios
+		
+		//rule-based decision based on number of outer iterations or maximum number of
+		//inner iterations (w/ appropriately scaled minimum data size threshold); 
+		return (_N >= PROB_SIZE_THRESHOLD_REMOTE && M > PROB_SIZE_THRESHOLD_MB)
+			|| (_Nmax >= 10 * PROB_SIZE_THRESHOLD_REMOTE && M > PROB_SIZE_THRESHOLD_MB/10);
 	}
 
 	protected boolean isCPOnlyPossible( OptNode n, double memBudget ) 
 		throws DMLRuntimeException
 	{
 		ExecType et = n.getExecType();
-		boolean ret = ( et == ExecType.CP);		
+		boolean ret = ( et == ExecType.CP);
 		
 		if( n.isLeaf() && et == getRemoteExecType() )
 		{


[2/3] systemml git commit: [SYSTEMML-1877] Fix setup parfor data-partition-execute (univar stats)

Posted by mb...@apache.org.
[SYSTEMML-1877] Fix setup parfor data-partition-execute (univar stats)

This patch fixes a setup issue for column-wise partitioning in the fused
parfor data-partition-execute spark job. To avoid such issues in the
future, this also includes a refactoring to centralize the analysis of
partition sizes as well as test cases that force the fused
data-partition-execute job for both MR and Spark.

Additionally, this patch fixes the constrained parfor optimizer to
consider the fused data-partition-execute job (so far, this job was
never selected even if both data partitioning and execution were forced
to its remote execution types).


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/9178a954
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/9178a954
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/9178a954

Branch: refs/heads/master
Commit: 9178a95495227e978dcc1049f04431ddbd2c4fc5
Parents: ec35215
Author: Matthias Boehm <mb...@gmail.com>
Authored: Thu Aug 31 22:39:32 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Sep 1 12:13:25 2017 -0700

----------------------------------------------------------------------
 .../controlprogram/ParForProgramBlock.java      |  29 ++++
 .../parfor/DataPartitionerRemoteMapper.java     |  28 ++--
 .../parfor/RemoteDPParForSparkWorker.java       |  21 +--
 .../parfor/RemoteDPParWorkerReducer.java        |  33 ++--
 .../RemoteParForColocatedNLineInputFormat.java  |  20 +--
 .../parfor/opt/OptimizerConstrained.java        |  21 ++-
 .../parfor/opt/OptimizerRuleBased.java          |   3 +-
 .../matrix/mapred/MRJobConfiguration.java       |  47 ++----
 .../parfor/ParForDataPartitionExecuteTest.java  | 153 +++++++++++++++++++
 .../functions/parfor/DataPartitionExecute.R     |  41 +++++
 .../functions/parfor/DataPartitionExecute.dml   |  42 +++++
 .../functions/parfor/ZPackageSuite.java         |   1 +
 12 files changed, 330 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index 3a9bf51..ce8bbed 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -215,6 +215,11 @@ public class ParForProgramBlock extends ForProgramBlock
 			return _dpf == PDataPartitionFormat.COLUMN_BLOCK_WISE_N 
 				|| _dpf == PDataPartitionFormat.ROW_BLOCK_WISE_N;
 		}
+		public boolean isRowwise() {
+			return _dpf == PDataPartitionFormat.ROW_WISE
+				|| _dpf == PDataPartitionFormat.ROW_BLOCK_WISE
+				|| _dpf == PDataPartitionFormat.ROW_BLOCK_WISE_N;
+		}
 		public long getNumParts(MatrixCharacteristics mc) {
 			switch( _dpf ) {
 				case ROW_WISE: return mc.getRows();
@@ -227,6 +232,30 @@ public class ParForProgramBlock extends ForProgramBlock
 					throw new RuntimeException("Unsupported partition format: "+_dpf);
 			}
 		}
+		public long getNumRows(MatrixCharacteristics mc) {
+			switch( _dpf ) {
+				case ROW_WISE: return 1;
+				case ROW_BLOCK_WISE: return mc.getRowsPerBlock();
+				case ROW_BLOCK_WISE_N: return _N;
+				case COLUMN_WISE: return mc.getRows();
+				case COLUMN_BLOCK_WISE: return mc.getRows();
+				case COLUMN_BLOCK_WISE_N: return mc.getRows();
+				default:
+					throw new RuntimeException("Unsupported partition format: "+_dpf);
+			}
+		}
+		public long getNumColumns(MatrixCharacteristics mc) {
+			switch( _dpf ) {
+				case ROW_WISE: return mc.getCols();
+				case ROW_BLOCK_WISE: return mc.getCols();
+				case ROW_BLOCK_WISE_N: return mc.getCols();
+				case COLUMN_WISE: return 1;
+				case COLUMN_BLOCK_WISE: return mc.getColsPerBlock();
+				case COLUMN_BLOCK_WISE_N: return _N;
+				default:
+					throw new RuntimeException("Unsupported partition format: "+_dpf);
+			}
+		}
 	}
 	
 	public enum PDataPartitioner {

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java
index 6fcfc1a..e26201e 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.IJV;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -48,15 +49,11 @@ import org.apache.sysml.runtime.util.MapReduceTool;
  */
 public class DataPartitionerRemoteMapper 
 	implements Mapper<Writable, Writable, Writable, Writable>
-{	
-	
+{
 	private DataPartitionerMapper _mapper = null;
 	
-	public DataPartitionerRemoteMapper( ) 
-	{
-		
-	}
-
+	public DataPartitionerRemoteMapper( ) { }
+	
 	@Override
 	public void map(Writable key, Writable value, OutputCollector<Writable, Writable> out, Reporter reporter) 
 		throws IOException
@@ -67,10 +64,7 @@ public class DataPartitionerRemoteMapper
 	@Override
 	public void configure(JobConf job)
 	{
-		long rlen = MRJobConfiguration.getPartitioningNumRows( job );
-		long clen = MRJobConfiguration.getPartitioningNumCols( job );
-		int brlen = MRJobConfiguration.getPartitioningBlockNumRows( job );
-		int bclen = MRJobConfiguration.getPartitioningBlockNumCols( job );
+		MatrixCharacteristics mc = MRJobConfiguration.getPartitionedMatrixSize(job);
 		InputInfo ii = MRJobConfiguration.getPartitioningInputInfo( job );
 		OutputInfo oi = MRJobConfiguration.getPartitioningOutputInfo( job );
 		PDataPartitionFormat pdf = MRJobConfiguration.getPartitioningFormat( job );
@@ -78,17 +72,21 @@ public class DataPartitionerRemoteMapper
 		boolean keepIndexes =  MRJobConfiguration.getPartitioningIndexFlag( job );
 		
 		if( ii == InputInfo.TextCellInputInfo )
-			_mapper = new DataPartitionerMapperTextcell(rlen, clen, brlen, bclen, pdf, n);
+			_mapper = new DataPartitionerMapperTextcell(mc.getRows(), mc.getCols(),
+				mc.getRowsPerBlock(), mc.getColsPerBlock(), pdf, n);
 		else if( ii == InputInfo.BinaryCellInputInfo )
-			_mapper = new DataPartitionerMapperBinarycell(rlen, clen, brlen, bclen, pdf, n);
+			_mapper = new DataPartitionerMapperBinarycell(mc.getRows(), mc.getCols(),
+				mc.getRowsPerBlock(), mc.getColsPerBlock(), pdf, n);
 		else if( ii == InputInfo.BinaryBlockInputInfo )
 		{
 			if( oi == OutputInfo.BinaryBlockOutputInfo )
-				_mapper = new DataPartitionerMapperBinaryblock(rlen, clen, brlen, bclen, pdf, n, keepIndexes);
+				_mapper = new DataPartitionerMapperBinaryblock(mc.getRows(), mc.getCols(),
+					mc.getRowsPerBlock(), mc.getColsPerBlock(), pdf, n, keepIndexes);
 			else if( oi == OutputInfo.BinaryCellOutputInfo )
 			{
 				boolean outputEmpty = MRJobConfiguration.getProgramBlocks(job)!=null; //fused parfor
-				_mapper = new DataPartitionerMapperBinaryblock2Binarycell(job, rlen, clen, brlen, bclen, pdf, n, keepIndexes, outputEmpty); 
+				_mapper = new DataPartitionerMapperBinaryblock2Binarycell(job, mc.getRows(), mc.getCols(),
+					mc.getRowsPerBlock(), mc.getColsPerBlock(), pdf, n, keepIndexes, outputEmpty); 
 			}
 			else
 				throw new RuntimeException("Partitioning from '"+ii+"' to '"+oi+"' not supported");

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index d20e7f3..dbc3fbf 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -87,18 +87,8 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
 		_aIters = aiters;
 		
 		//setup matrix block partition meta data
-		switch( dpf._dpf ) {
-			case ROW_WISE: 
-				_rlen = (int)mc.getRows(); _clen = 1; break;
-			case ROW_BLOCK_WISE_N:
-				_rlen = dpf._N; _clen = (int)mc.getCols(); break;
-			case COLUMN_BLOCK_WISE:
-				_rlen = 1; _clen = (int)mc.getCols(); break;
-			case COLUMN_BLOCK_WISE_N:
-				_rlen = (int)mc.getRows(); _clen = dpf._N; break;
-			default:
-				throw new RuntimeException("Unsupported partition format: "+dpf._dpf.name());
-		}
+		_rlen = (int)dpf.getNumRows(mc);
+		_clen = (int)dpf.getNumColumns(mc);
 		_brlen = mc.getRowsPerBlock();
 		_bclen = mc.getColsPerBlock();
 		_tSparseCol = tSparseCol;
@@ -237,8 +227,8 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
 				int col_offset = (int)(pval.indexes.getColumnIndex()-1)*_bclen;
 				if( !partition.isInSparseFormat() ) //DENSE
 					partition.copy( row_offset, row_offset+pval.block.getNumRows()-1, 
-							   col_offset, col_offset+pval.block.getNumColumns()-1,
-							   pval.block, false ); 
+						col_offset, col_offset+pval.block.getNumColumns()-1,
+						pval.block, false ); 
 				else //SPARSE 
 					partition.appendToSparse(pval.block, row_offset, col_offset);
 				lnnz += pval.block.getNonZeros();
@@ -250,8 +240,7 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
 			partition.setNonZeros(lnnz);
 			partition.examSparsity();
 		}
-		catch(DMLRuntimeException ex)
-		{
+		catch(DMLRuntimeException ex) {
 			throw new IOException(ex);
 		}
 		

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
index 3f26945..d022ac8 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
@@ -32,6 +32,7 @@ import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
+import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
 import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics;
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
@@ -42,6 +43,7 @@ import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
 import org.apache.sysml.runtime.instructions.cp.IntObject;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
@@ -66,18 +68,15 @@ public class RemoteDPParWorkerReducer extends ParWorker
 	//reuse matrix partition
 	private MatrixBlock _partition = null; 
 	private boolean _tSparseCol = false;
-		
-	//MR ParWorker attributes  
-	protected String  _stringID       = null;
+	
+	//MR ParWorker attributes 
+	protected String _stringID = null;
 
 	//cached collector/reporter
 	protected OutputCollector<Writable, Writable> _out = null;
 	protected Reporter _report = null;
 
-	public RemoteDPParWorkerReducer() 
-	{
-		
-	}
+	public RemoteDPParWorkerReducer() { }
 	
 	@Override
 	public void reduce(LongWritable key, Iterator<Writable> valueList, OutputCollector<Writable, Writable> out, Reporter reporter)
@@ -107,8 +106,7 @@ public class RemoteDPParWorkerReducer extends ParWorker
 			//execute program
 			executeTask( lTask );
 		}
-		catch(Exception ex)
-		{
+		catch(Exception ex) {
 			throw new IOException("ParFOR: Failed to execute task.",ex);
 		}
 		
@@ -120,18 +118,15 @@ public class RemoteDPParWorkerReducer extends ParWorker
 	public void configure(JobConf job)
 	{
 		//Step 1: configure data partitioning information
-		_rlen = (int)MRJobConfiguration.getPartitioningNumRows( job );
-		_clen = (int)MRJobConfiguration.getPartitioningNumCols( job );
-		_brlen = MRJobConfiguration.getPartitioningBlockNumRows( job );
-		_bclen = MRJobConfiguration.getPartitioningBlockNumCols( job );
+		_dpf = MRJobConfiguration.getPartitioningFormat( job );
+		MatrixCharacteristics mc = MRJobConfiguration.getPartitionedMatrixSize(job);
+		PartitionFormat pf = new PartitionFormat(_dpf, MRJobConfiguration.getPartitioningSizeN(job));
+		_rlen = (int)pf.getNumRows(mc);
+		_clen = (int)pf.getNumColumns(mc);
+		_brlen = mc.getRowsPerBlock();
+		_bclen = mc.getColsPerBlock();
 		_iterVar = MRJobConfiguration.getPartitioningItervar( job );
 		_inputVar = MRJobConfiguration.getPartitioningMatrixvar( job );
-		_dpf = MRJobConfiguration.getPartitioningFormat( job );		
-		switch( _dpf ) { //create matrix partition for reuse
-			case ROW_WISE:    _rlen = 1; break;
-			case COLUMN_WISE: _clen = 1; break;
-			default:  throw new RuntimeException("Partition format not yet supported in fused partition-execute: "+_dpf);
-		}
 		_info = MRJobConfiguration.getPartitioningOutputInfo( job );
 		_tSparseCol = MRJobConfiguration.getPartitioningTransposedCol( job ); 
 		if( _tSparseCol )

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java
index d5cc4bc..a2a231d 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.lib.NLineInputFormat;
 
 import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
+import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
 
 /**
@@ -37,29 +39,21 @@ import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
  */
 public class RemoteParForColocatedNLineInputFormat extends NLineInputFormat
 {
-	
 	@Override
 	public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
-	{	
+	{
 		InputSplit[] tmp = super.getSplits(job, numSplits);
 		
 		//get partitioning information
+		MatrixCharacteristics mc = MRJobConfiguration.getPartitionedMatrixSize(job);
 		PDataPartitionFormat dpf = MRJobConfiguration.getPartitioningFormat(job);
-		int blen = -1;
-		switch( dpf ) {
-			case ROW_WISE:          blen = 1; break;
-			case ROW_BLOCK_WISE:    blen = MRJobConfiguration.getPartitioningBlockNumRows(job); break;
-			case COLUMN_WISE:       blen = 1; break;
-			case COLUMN_BLOCK_WISE: blen = MRJobConfiguration.getPartitioningBlockNumCols(job); break;
-			default: 
-				//do nothing
-		}		
+		PartitionFormat pf = new PartitionFormat(dpf, -1);
+		int blen = (int) (pf.isRowwise() ? pf.getNumRows(mc) : pf.getNumColumns(mc));
 		String fname = MRJobConfiguration.getPartitioningFilename(job);
 
 		//create wrapper splits 
 		InputSplit[] ret = new InputSplit[ tmp.length ];
-		for( int i=0; i<tmp.length; i++ )
-		{
+		for( int i=0; i<tmp.length; i++ ) {
 			//check for robustness of subsequent cast
 			if( tmp[i] instanceof FileSplit ) 
 				ret[i] = new RemoteParForColocatedFileSplit( (FileSplit) tmp[i], fname, blen );

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
index 9e78c2a..1ca5631 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
@@ -231,19 +231,18 @@ public class OptimizerConstrained extends OptimizerRuleBased
 	protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap vars, HashMap<String,PartitionFormat> partitionedMatrices, double thetaM)
 		throws DMLRuntimeException
 	{
-		boolean blockwise = false;
-
+		//call rewrite first to obtain partitioning information
+		String initPlan = n.getParam(ParamType.DATA_PARTITIONER);
+		boolean blockwise = super.rewriteSetDataPartitioner(n, vars, partitionedMatrices, thetaM);
+		
 		// constraint awareness
-		if( !n.getParam(ParamType.DATA_PARTITIONER).equals(PDataPartitioner.UNSPECIFIED.toString()) )
-		{
-			Object[] o = OptTreeConverter.getAbstractPlanMapping().getMappedProg(n.getID());
-			ParForProgramBlock pfpb = (ParForProgramBlock) o[1];
-			pfpb.setDataPartitioner(PDataPartitioner.valueOf(n.getParam(ParamType.DATA_PARTITIONER)));
-			LOG.debug(getOptMode()+" OPT: forced 'set data partitioner' - result="+n.getParam(ParamType.DATA_PARTITIONER) );
+		if( !initPlan.equals(PDataPartitioner.UNSPECIFIED.name()) ) {
+			ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
+				.getAbstractPlanMapping().getMappedProg(n.getID())[1];
+			pfpb.setDataPartitioner(PDataPartitioner.valueOf(initPlan));
+			LOG.debug(getOptMode()+" OPT: forced 'set data partitioner' - result=" + initPlan );
 		}
-		else
-			super.rewriteSetDataPartitioner(n, vars, partitionedMatrices, thetaM);
-
+		
 		return blockwise;
 	}
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index 154109a..9456703 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -100,7 +100,6 @@ import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyze
 import org.apache.sysml.runtime.instructions.Instruction;
 import org.apache.sysml.runtime.instructions.cp.Data;
 import org.apache.sysml.runtime.instructions.cp.FunctionCallCPInstruction;
-import org.apache.sysml.runtime.instructions.gpu.context.GPUContext;
 import org.apache.sysml.runtime.instructions.gpu.context.GPUContextPool;
 import org.apache.sysml.runtime.instructions.spark.data.RDDObject;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
@@ -1505,7 +1504,7 @@ public class OptimizerRuleBased extends Optimizer
 			&& partitionedMatrices.size()==1 ) //only one partitioned matrix
 		{
 			ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
-	                  .getAbstractPlanMapping().getMappedProg(pn.getID())[1];
+				.getAbstractPlanMapping().getMappedProg(pn.getID())[1];
 			
 			//partitioned matrix
 			String moVarname = partitionedMatrices.keySet().iterator().next();

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
index 88512b6..5cd5daf 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
@@ -563,61 +563,42 @@ public class MRJobConfiguration
 		
 		//set transpose sparse column vector
 		job.setBoolean(PARTITIONING_OUTPUT_KEEP_INDEXES_CONFIG, keepIndexes);
-				
 	}
 	
-	public static long getPartitioningNumRows( JobConf job )
-	{
-		return Long.parseLong(job.get(PARTITIONING_INPUT_MATRIX_NUM_ROW_CONFIG));
+	public static MatrixCharacteristics getPartitionedMatrixSize(JobConf job) {
+		return new MatrixCharacteristics(
+			Long.parseLong(job.get(PARTITIONING_INPUT_MATRIX_NUM_ROW_CONFIG)),
+			Long.parseLong(job.get(PARTITIONING_INPUT_MATRIX_NUM_COLUMN_CONFIG)),
+			Integer.parseInt(job.get(PARTITIONING_INPUT_BLOCK_NUM_ROW_CONFIG)),
+			Integer.parseInt(job.get(PARTITIONING_INPUT_BLOCK_NUM_COLUMN_CONFIG)));
 	}
 	
-	public static long getPartitioningNumCols( JobConf job )
-	{
-		return Long.parseLong(job.get(PARTITIONING_INPUT_MATRIX_NUM_COLUMN_CONFIG));
-	}
 	
-	public static void setPartitioningBlockNumRows( JobConf job, int brlen )
-	{
+	public static void setPartitioningBlockNumRows( JobConf job, int brlen ) {
 		job.set(PARTITIONING_INPUT_BLOCK_NUM_ROW_CONFIG, String.valueOf(brlen));
 	}
 	
-	public static int getPartitioningBlockNumRows( JobConf job )
-	{
-		return Integer.parseInt(job.get(PARTITIONING_INPUT_BLOCK_NUM_ROW_CONFIG));
-	}
-
-	public static void setPartitioningBlockNumCols( JobConf job, int bclen )
-	{
+	public static void setPartitioningBlockNumCols( JobConf job, int bclen ) {
 		job.set(PARTITIONING_INPUT_BLOCK_NUM_COLUMN_CONFIG,String.valueOf(bclen));
 	}
 	
-	public static int getPartitioningBlockNumCols( JobConf job )
-	{
-		return Integer.parseInt(job.get(PARTITIONING_INPUT_BLOCK_NUM_COLUMN_CONFIG));
-	}
-	
-	public static InputInfo getPartitioningInputInfo( JobConf job )
-	{
+	public static InputInfo getPartitioningInputInfo( JobConf job ) {
 		return InputInfo.stringToInputInfo(job.get(PARTITIONING_INPUT_INFO_CONFIG));
 	}
 	
-	public static OutputInfo getPartitioningOutputInfo( JobConf job )
-	{
+	public static OutputInfo getPartitioningOutputInfo( JobConf job ) {
 		return OutputInfo.stringToOutputInfo(job.get(PARTITIONING_OUTPUT_INFO_CONFIG));
 	}
-
-	public static void setPartitioningFormat( JobConf job, PDataPartitionFormat dpf )
-	{
+	
+	public static void setPartitioningFormat( JobConf job, PDataPartitionFormat dpf ) {
 		job.set(PARTITIONING_OUTPUT_FORMAT_CONFIG, dpf.toString());
 	}
 	
-	public static PDataPartitionFormat getPartitioningFormat( JobConf job )
-	{
+	public static PDataPartitionFormat getPartitioningFormat( JobConf job )	{
 		return PDataPartitionFormat.valueOf(job.get(PARTITIONING_OUTPUT_FORMAT_CONFIG));
 	}
 	
-	public static int getPartitioningSizeN( JobConf job )
-	{
+	public static int getPartitioningSizeN( JobConf job ) {
 		return Integer.parseInt(job.get(PARTITIONING_OUTPUT_N_CONFIG));
 	}
 	

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDataPartitionExecuteTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDataPartitionExecuteTest.java b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDataPartitionExecuteTest.java
new file mode 100644
index 0000000..80c7568
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDataPartitionExecuteTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.parfor;
+
+import java.util.HashMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.lops.LopProperties.ExecType;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+public class ParForDataPartitionExecuteTest extends AutomatedTestBase 
+{
+	private final static String TEST_NAME = "DataPartitionExecute";
+	private final static String TEST_DIR = "functions/parfor/";
+	private final static String TEST_CLASS_DIR = TEST_DIR + ParForDataPartitionExecuteTest.class.getSimpleName() + "/";
+	private final static double eps = 1e-10;
+	
+	private final static int dim1 = 2001;
+	private final static int dim2 = 101;
+	private final static double sparsity1 = 0.7;
+	private final static double sparsity2 = 0.3d;
+	
+	@Override
+	public void setUp() {
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[]{"R"}));
+	}
+	
+	@Test
+	public void testFusedDataPartitionExecuteRowDenseMR() {
+		runFusedDataPartitionExecuteTest(false, true, ExecType.MR);
+	}
+	
+	@Test
+	public void testFusedDataPartitionExecuteColDenseMR() {
+		runFusedDataPartitionExecuteTest(false, false, ExecType.MR);
+	}
+	
+	@Test
+	public void testFusedDataPartitionExecuteRowSparseMR() {
+		runFusedDataPartitionExecuteTest(true, true, ExecType.MR);
+	}
+	
+	@Test
+	public void testFusedDataPartitionExecuteColSparseMR() {
+		runFusedDataPartitionExecuteTest(true, false, ExecType.MR);
+	}
+	
+	@Test
+	public void testFusedDataPartitionExecuteRowDenseSpark() {
+		runFusedDataPartitionExecuteTest(false, true, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testFusedDataPartitionExecuteColDenseSpark() {
+		runFusedDataPartitionExecuteTest(false, false, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testFusedDataPartitionExecuteRowSparseSpark() {
+		runFusedDataPartitionExecuteTest(true, true, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testFusedDataPartitionExecuteColSparseSpark() {
+		runFusedDataPartitionExecuteTest(true, false, ExecType.SPARK);
+	}
+	
+	private void runFusedDataPartitionExecuteTest(boolean sparse, boolean row, ExecType et)
+	{
+		RUNTIME_PLATFORM platformOld = rtplatform;
+		switch( et ){
+			case MR: rtplatform = RUNTIME_PLATFORM.HYBRID; break;
+			case SPARK: rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK; break;
+			default: throw new RuntimeException("Unsupported exec type: "+et.name());
+		}
+		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		if( et == ExecType.SPARK )
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+		
+		//modify memory budget to trigger fused datapartition-execute
+		long oldmem = InfrastructureAnalyzer.getLocalMaxMemory();
+		InfrastructureAnalyzer.setLocalMaxMemory(1*1024*1024); //1MB
+		
+		try
+		{
+			int rows = row ? dim2 : dim1;
+			int cols = row ? dim1 : dim2;
+			
+			TestConfiguration config = getTestConfiguration(TEST_NAME);
+			config.addVariable("rows", rows);
+			config.addVariable("cols", cols);
+			loadTestConfiguration(config);
+			
+			String HOME = SCRIPT_DIR + TEST_DIR;
+			fullDMLScriptName = HOME + TEST_NAME + ".dml";
+			programArgs = new String[]{"-stats", "-args", input("X"), 
+				String.valueOf(et == ExecType.SPARK).toUpperCase(),
+				String.valueOf(row).toUpperCase(), output("R") };
+			
+			fullRScriptName = HOME + TEST_NAME + ".R";
+			rCmd = "Rscript" + " " + fullRScriptName + " " + inputDir() 
+				+ " " + String.valueOf(row).toUpperCase() + " " + expectedDir();
+			
+			//generate input data
+			double sparsity = sparse ? sparsity2 : sparsity1;
+			double[][] X = getRandomMatrix(rows, cols, 0, 1, sparsity, 7);
+			writeInputMatrixWithMTD("X", X, true);
+			
+			//run test case
+			runTest(true, false, null, -1);
+			runRScript(true);
+			
+			//compare matrices
+			HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R");
+			HashMap<CellIndex, Double> rfile  = readRMatrixFromFS("R");
+			TestUtils.compareMatrices(dmlfile, rfile, eps, "DML", "R");
+			
+			//check for compiled datapartition-execute
+			Assert.assertTrue(heavyHittersContainsSubString(
+				(et == ExecType.SPARK) ? "ParFor-DPESP" : "MR-Job_ParFor-DPEMR"));
+		}
+		finally {
+			rtplatform = platformOld;
+			DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+			InfrastructureAnalyzer.setLocalMaxMemory(oldmem); //1MB
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/test/scripts/functions/parfor/DataPartitionExecute.R
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/parfor/DataPartitionExecute.R b/src/test/scripts/functions/parfor/DataPartitionExecute.R
new file mode 100644
index 0000000..8534606
--- /dev/null
+++ b/src/test/scripts/functions/parfor/DataPartitionExecute.R
@@ -0,0 +1,41 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+args <- commandArgs(TRUE)
+options(digits=22)
+
+library("Matrix")
+
+X <- as.matrix(readMM(paste(args[1], "X.mtx", sep="")))
+R = matrix(0, nrow(X), ncol(X)); 
+
+if( as.logical(args[2]) ) {
+   for(i in 1:nrow(X)) {
+      R[i,] = pmin(X[i,], sum(X[i,]));
+   }
+} else {
+   for(i in 1:ncol(X)) {
+      R[,i] = pmin(X[,i], sum(X[,i]));
+   }
+}
+
+writeMM(as(R, "CsparseMatrix"), paste(args[3], "R", sep=""));

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/test/scripts/functions/parfor/DataPartitionExecute.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/parfor/DataPartitionExecute.dml b/src/test/scripts/functions/parfor/DataPartitionExecute.dml
new file mode 100644
index 0000000..0fd2299
--- /dev/null
+++ b/src/test/scripts/functions/parfor/DataPartitionExecute.dml
@@ -0,0 +1,42 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = read($1);
+R = matrix(0, nrow(X), ncol(X));
+
+if( $2 ) {
+  if( $3 )
+     parfor(i in 1:nrow(X), opt=CONSTRAINED, mode=REMOTE_SPARK_DP, datapartitioner=REMOTE_SPARK)
+        R[i,] = min(X[i,], sum(X[i,]));
+  else
+     parfor(i in 1:ncol(X), opt=CONSTRAINED, mode=REMOTE_SPARK_DP, datapartitioner=REMOTE_SPARK)
+        R[,i] = min(X[,i], sum(X[,i]));
+}
+else {
+  if( $3 )
+     parfor(i in 1:nrow(X), opt=CONSTRAINED, mode=REMOTE_MR_DP, datapartitioner=REMOTE_MR)
+        R[i,] = min(X[i,], sum(X[i,]));
+  else
+     parfor(i in 1:ncol(X), opt=CONSTRAINED, mode=REMOTE_MR_DP, datapartitioner=REMOTE_MR)
+        R[,i] = min(X[,i], sum(X[,i]));
+}
+
+write(R, $4);

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
----------------------------------------------------------------------
diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
index 976b23f..c898a7a 100644
--- a/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
+++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
@@ -30,6 +30,7 @@ import org.junit.runners.Suite;
 	ParForAdversarialLiteralsTest.class,
 	ParForBlockwiseDataPartitioningTest.class,
 	ParForColwiseDataPartitioningTest.class,
+	ParForDataPartitionExecuteTest.class,
 	ParForDataPartitionLeftIndexingTest.class,
 	ParForDependencyAnalysisTest.class,
 	ParForFunctionSerializationTest.class,