You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/09/01 19:12:10 UTC

[2/3] systemml git commit: [SYSTEMML-1877] Fix setup parfor data-partition-execute (univar stats)

[SYSTEMML-1877] Fix setup parfor data-partition-execute (univar stats)

This patch fixes a setup issue for column-wise partitioning in the fused
parfor data-partition-execute spark job. To avoid such issues in the
future, this also includes a refactoring to centralize the analysis of
partition sizes as well as test cases that force the fused
data-partition-execute job for both MR and Spark.

Additionally, this patch fixes the constrained parfor optimizer to
consider the fused data-partition-execute job (so far, this job was
never selected even if both data partitioning and execution were forced
to its remote execution types).


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/9178a954
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/9178a954
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/9178a954

Branch: refs/heads/master
Commit: 9178a95495227e978dcc1049f04431ddbd2c4fc5
Parents: ec35215
Author: Matthias Boehm <mb...@gmail.com>
Authored: Thu Aug 31 22:39:32 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Sep 1 12:13:25 2017 -0700

----------------------------------------------------------------------
 .../controlprogram/ParForProgramBlock.java      |  29 ++++
 .../parfor/DataPartitionerRemoteMapper.java     |  28 ++--
 .../parfor/RemoteDPParForSparkWorker.java       |  21 +--
 .../parfor/RemoteDPParWorkerReducer.java        |  33 ++--
 .../RemoteParForColocatedNLineInputFormat.java  |  20 +--
 .../parfor/opt/OptimizerConstrained.java        |  21 ++-
 .../parfor/opt/OptimizerRuleBased.java          |   3 +-
 .../matrix/mapred/MRJobConfiguration.java       |  47 ++----
 .../parfor/ParForDataPartitionExecuteTest.java  | 153 +++++++++++++++++++
 .../functions/parfor/DataPartitionExecute.R     |  41 +++++
 .../functions/parfor/DataPartitionExecute.dml   |  42 +++++
 .../functions/parfor/ZPackageSuite.java         |   1 +
 12 files changed, 330 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index 3a9bf51..ce8bbed 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -215,6 +215,11 @@ public class ParForProgramBlock extends ForProgramBlock
 			return _dpf == PDataPartitionFormat.COLUMN_BLOCK_WISE_N 
 				|| _dpf == PDataPartitionFormat.ROW_BLOCK_WISE_N;
 		}
+		public boolean isRowwise() {
+			return _dpf == PDataPartitionFormat.ROW_WISE
+				|| _dpf == PDataPartitionFormat.ROW_BLOCK_WISE
+				|| _dpf == PDataPartitionFormat.ROW_BLOCK_WISE_N;
+		}
 		public long getNumParts(MatrixCharacteristics mc) {
 			switch( _dpf ) {
 				case ROW_WISE: return mc.getRows();
@@ -227,6 +232,30 @@ public class ParForProgramBlock extends ForProgramBlock
 					throw new RuntimeException("Unsupported partition format: "+_dpf);
 			}
 		}
+		public long getNumRows(MatrixCharacteristics mc) {
+			switch( _dpf ) {
+				case ROW_WISE: return 1;
+				case ROW_BLOCK_WISE: return mc.getRowsPerBlock();
+				case ROW_BLOCK_WISE_N: return _N;
+				case COLUMN_WISE: return mc.getRows();
+				case COLUMN_BLOCK_WISE: return mc.getRows();
+				case COLUMN_BLOCK_WISE_N: return mc.getRows();
+				default:
+					throw new RuntimeException("Unsupported partition format: "+_dpf);
+			}
+		}
+		public long getNumColumns(MatrixCharacteristics mc) {
+			switch( _dpf ) {
+				case ROW_WISE: return mc.getCols();
+				case ROW_BLOCK_WISE: return mc.getCols();
+				case ROW_BLOCK_WISE_N: return mc.getCols();
+				case COLUMN_WISE: return 1;
+				case COLUMN_BLOCK_WISE: return mc.getColsPerBlock();
+				case COLUMN_BLOCK_WISE_N: return _N;
+				default:
+					throw new RuntimeException("Unsupported partition format: "+_dpf);
+			}
+		}
 	}
 	
 	public enum PDataPartitioner {

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java
index 6fcfc1a..e26201e 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.IJV;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -48,15 +49,11 @@ import org.apache.sysml.runtime.util.MapReduceTool;
  */
 public class DataPartitionerRemoteMapper 
 	implements Mapper<Writable, Writable, Writable, Writable>
-{	
-	
+{
 	private DataPartitionerMapper _mapper = null;
 	
-	public DataPartitionerRemoteMapper( ) 
-	{
-		
-	}
-
+	public DataPartitionerRemoteMapper( ) { }
+	
 	@Override
 	public void map(Writable key, Writable value, OutputCollector<Writable, Writable> out, Reporter reporter) 
 		throws IOException
@@ -67,10 +64,7 @@ public class DataPartitionerRemoteMapper
 	@Override
 	public void configure(JobConf job)
 	{
-		long rlen = MRJobConfiguration.getPartitioningNumRows( job );
-		long clen = MRJobConfiguration.getPartitioningNumCols( job );
-		int brlen = MRJobConfiguration.getPartitioningBlockNumRows( job );
-		int bclen = MRJobConfiguration.getPartitioningBlockNumCols( job );
+		MatrixCharacteristics mc = MRJobConfiguration.getPartitionedMatrixSize(job);
 		InputInfo ii = MRJobConfiguration.getPartitioningInputInfo( job );
 		OutputInfo oi = MRJobConfiguration.getPartitioningOutputInfo( job );
 		PDataPartitionFormat pdf = MRJobConfiguration.getPartitioningFormat( job );
@@ -78,17 +72,21 @@ public class DataPartitionerRemoteMapper
 		boolean keepIndexes =  MRJobConfiguration.getPartitioningIndexFlag( job );
 		
 		if( ii == InputInfo.TextCellInputInfo )
-			_mapper = new DataPartitionerMapperTextcell(rlen, clen, brlen, bclen, pdf, n);
+			_mapper = new DataPartitionerMapperTextcell(mc.getRows(), mc.getCols(),
+				mc.getRowsPerBlock(), mc.getColsPerBlock(), pdf, n);
 		else if( ii == InputInfo.BinaryCellInputInfo )
-			_mapper = new DataPartitionerMapperBinarycell(rlen, clen, brlen, bclen, pdf, n);
+			_mapper = new DataPartitionerMapperBinarycell(mc.getRows(), mc.getCols(),
+				mc.getRowsPerBlock(), mc.getColsPerBlock(), pdf, n);
 		else if( ii == InputInfo.BinaryBlockInputInfo )
 		{
 			if( oi == OutputInfo.BinaryBlockOutputInfo )
-				_mapper = new DataPartitionerMapperBinaryblock(rlen, clen, brlen, bclen, pdf, n, keepIndexes);
+				_mapper = new DataPartitionerMapperBinaryblock(mc.getRows(), mc.getCols(),
+					mc.getRowsPerBlock(), mc.getColsPerBlock(), pdf, n, keepIndexes);
 			else if( oi == OutputInfo.BinaryCellOutputInfo )
 			{
 				boolean outputEmpty = MRJobConfiguration.getProgramBlocks(job)!=null; //fused parfor
-				_mapper = new DataPartitionerMapperBinaryblock2Binarycell(job, rlen, clen, brlen, bclen, pdf, n, keepIndexes, outputEmpty); 
+				_mapper = new DataPartitionerMapperBinaryblock2Binarycell(job, mc.getRows(), mc.getCols(),
+					mc.getRowsPerBlock(), mc.getColsPerBlock(), pdf, n, keepIndexes, outputEmpty); 
 			}
 			else
 				throw new RuntimeException("Partitioning from '"+ii+"' to '"+oi+"' not supported");

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index d20e7f3..dbc3fbf 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -87,18 +87,8 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
 		_aIters = aiters;
 		
 		//setup matrix block partition meta data
-		switch( dpf._dpf ) {
-			case ROW_WISE: 
-				_rlen = (int)mc.getRows(); _clen = 1; break;
-			case ROW_BLOCK_WISE_N:
-				_rlen = dpf._N; _clen = (int)mc.getCols(); break;
-			case COLUMN_BLOCK_WISE:
-				_rlen = 1; _clen = (int)mc.getCols(); break;
-			case COLUMN_BLOCK_WISE_N:
-				_rlen = (int)mc.getRows(); _clen = dpf._N; break;
-			default:
-				throw new RuntimeException("Unsupported partition format: "+dpf._dpf.name());
-		}
+		_rlen = (int)dpf.getNumRows(mc);
+		_clen = (int)dpf.getNumColumns(mc);
 		_brlen = mc.getRowsPerBlock();
 		_bclen = mc.getColsPerBlock();
 		_tSparseCol = tSparseCol;
@@ -237,8 +227,8 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
 				int col_offset = (int)(pval.indexes.getColumnIndex()-1)*_bclen;
 				if( !partition.isInSparseFormat() ) //DENSE
 					partition.copy( row_offset, row_offset+pval.block.getNumRows()-1, 
-							   col_offset, col_offset+pval.block.getNumColumns()-1,
-							   pval.block, false ); 
+						col_offset, col_offset+pval.block.getNumColumns()-1,
+						pval.block, false ); 
 				else //SPARSE 
 					partition.appendToSparse(pval.block, row_offset, col_offset);
 				lnnz += pval.block.getNonZeros();
@@ -250,8 +240,7 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
 			partition.setNonZeros(lnnz);
 			partition.examSparsity();
 		}
-		catch(DMLRuntimeException ex)
-		{
+		catch(DMLRuntimeException ex) {
 			throw new IOException(ex);
 		}
 		

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
index 3f26945..d022ac8 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
@@ -32,6 +32,7 @@ import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
+import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
 import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics;
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
@@ -42,6 +43,7 @@ import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
 import org.apache.sysml.runtime.instructions.cp.IntObject;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
@@ -66,18 +68,15 @@ public class RemoteDPParWorkerReducer extends ParWorker
 	//reuse matrix partition
 	private MatrixBlock _partition = null; 
 	private boolean _tSparseCol = false;
-		
-	//MR ParWorker attributes  
-	protected String  _stringID       = null;
+	
+	//MR ParWorker attributes 
+	protected String _stringID = null;
 
 	//cached collector/reporter
 	protected OutputCollector<Writable, Writable> _out = null;
 	protected Reporter _report = null;
 
-	public RemoteDPParWorkerReducer() 
-	{
-		
-	}
+	public RemoteDPParWorkerReducer() { }
 	
 	@Override
 	public void reduce(LongWritable key, Iterator<Writable> valueList, OutputCollector<Writable, Writable> out, Reporter reporter)
@@ -107,8 +106,7 @@ public class RemoteDPParWorkerReducer extends ParWorker
 			//execute program
 			executeTask( lTask );
 		}
-		catch(Exception ex)
-		{
+		catch(Exception ex) {
 			throw new IOException("ParFOR: Failed to execute task.",ex);
 		}
 		
@@ -120,18 +118,15 @@ public class RemoteDPParWorkerReducer extends ParWorker
 	public void configure(JobConf job)
 	{
 		//Step 1: configure data partitioning information
-		_rlen = (int)MRJobConfiguration.getPartitioningNumRows( job );
-		_clen = (int)MRJobConfiguration.getPartitioningNumCols( job );
-		_brlen = MRJobConfiguration.getPartitioningBlockNumRows( job );
-		_bclen = MRJobConfiguration.getPartitioningBlockNumCols( job );
+		_dpf = MRJobConfiguration.getPartitioningFormat( job );
+		MatrixCharacteristics mc = MRJobConfiguration.getPartitionedMatrixSize(job);
+		PartitionFormat pf = new PartitionFormat(_dpf, MRJobConfiguration.getPartitioningSizeN(job));
+		_rlen = (int)pf.getNumRows(mc);
+		_clen = (int)pf.getNumColumns(mc);
+		_brlen = mc.getRowsPerBlock();
+		_bclen = mc.getColsPerBlock();
 		_iterVar = MRJobConfiguration.getPartitioningItervar( job );
 		_inputVar = MRJobConfiguration.getPartitioningMatrixvar( job );
-		_dpf = MRJobConfiguration.getPartitioningFormat( job );		
-		switch( _dpf ) { //create matrix partition for reuse
-			case ROW_WISE:    _rlen = 1; break;
-			case COLUMN_WISE: _clen = 1; break;
-			default:  throw new RuntimeException("Partition format not yet supported in fused partition-execute: "+_dpf);
-		}
 		_info = MRJobConfiguration.getPartitioningOutputInfo( job );
 		_tSparseCol = MRJobConfiguration.getPartitioningTransposedCol( job ); 
 		if( _tSparseCol )

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java
index d5cc4bc..a2a231d 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.lib.NLineInputFormat;
 
 import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
+import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
 
 /**
@@ -37,29 +39,21 @@ import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
  */
 public class RemoteParForColocatedNLineInputFormat extends NLineInputFormat
 {
-	
 	@Override
 	public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
-	{	
+	{
 		InputSplit[] tmp = super.getSplits(job, numSplits);
 		
 		//get partitioning information
+		MatrixCharacteristics mc = MRJobConfiguration.getPartitionedMatrixSize(job);
 		PDataPartitionFormat dpf = MRJobConfiguration.getPartitioningFormat(job);
-		int blen = -1;
-		switch( dpf ) {
-			case ROW_WISE:          blen = 1; break;
-			case ROW_BLOCK_WISE:    blen = MRJobConfiguration.getPartitioningBlockNumRows(job); break;
-			case COLUMN_WISE:       blen = 1; break;
-			case COLUMN_BLOCK_WISE: blen = MRJobConfiguration.getPartitioningBlockNumCols(job); break;
-			default: 
-				//do nothing
-		}		
+		PartitionFormat pf = new PartitionFormat(dpf, -1);
+		int blen = (int) (pf.isRowwise() ? pf.getNumRows(mc) : pf.getNumColumns(mc));
 		String fname = MRJobConfiguration.getPartitioningFilename(job);
 
 		//create wrapper splits 
 		InputSplit[] ret = new InputSplit[ tmp.length ];
-		for( int i=0; i<tmp.length; i++ )
-		{
+		for( int i=0; i<tmp.length; i++ ) {
 			//check for robustness of subsequent cast
 			if( tmp[i] instanceof FileSplit ) 
 				ret[i] = new RemoteParForColocatedFileSplit( (FileSplit) tmp[i], fname, blen );

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
index 9e78c2a..1ca5631 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
@@ -231,19 +231,18 @@ public class OptimizerConstrained extends OptimizerRuleBased
 	protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap vars, HashMap<String,PartitionFormat> partitionedMatrices, double thetaM)
 		throws DMLRuntimeException
 	{
-		boolean blockwise = false;
-
+		//call rewrite first to obtain partitioning information
+		String initPlan = n.getParam(ParamType.DATA_PARTITIONER);
+		boolean blockwise = super.rewriteSetDataPartitioner(n, vars, partitionedMatrices, thetaM);
+		
 		// constraint awareness
-		if( !n.getParam(ParamType.DATA_PARTITIONER).equals(PDataPartitioner.UNSPECIFIED.toString()) )
-		{
-			Object[] o = OptTreeConverter.getAbstractPlanMapping().getMappedProg(n.getID());
-			ParForProgramBlock pfpb = (ParForProgramBlock) o[1];
-			pfpb.setDataPartitioner(PDataPartitioner.valueOf(n.getParam(ParamType.DATA_PARTITIONER)));
-			LOG.debug(getOptMode()+" OPT: forced 'set data partitioner' - result="+n.getParam(ParamType.DATA_PARTITIONER) );
+		if( !initPlan.equals(PDataPartitioner.UNSPECIFIED.name()) ) {
+			ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
+				.getAbstractPlanMapping().getMappedProg(n.getID())[1];
+			pfpb.setDataPartitioner(PDataPartitioner.valueOf(initPlan));
+			LOG.debug(getOptMode()+" OPT: forced 'set data partitioner' - result=" + initPlan );
 		}
-		else
-			super.rewriteSetDataPartitioner(n, vars, partitionedMatrices, thetaM);
-
+		
 		return blockwise;
 	}
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index 154109a..9456703 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -100,7 +100,6 @@ import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyze
 import org.apache.sysml.runtime.instructions.Instruction;
 import org.apache.sysml.runtime.instructions.cp.Data;
 import org.apache.sysml.runtime.instructions.cp.FunctionCallCPInstruction;
-import org.apache.sysml.runtime.instructions.gpu.context.GPUContext;
 import org.apache.sysml.runtime.instructions.gpu.context.GPUContextPool;
 import org.apache.sysml.runtime.instructions.spark.data.RDDObject;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
@@ -1505,7 +1504,7 @@ public class OptimizerRuleBased extends Optimizer
 			&& partitionedMatrices.size()==1 ) //only one partitioned matrix
 		{
 			ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
-	                  .getAbstractPlanMapping().getMappedProg(pn.getID())[1];
+				.getAbstractPlanMapping().getMappedProg(pn.getID())[1];
 			
 			//partitioned matrix
 			String moVarname = partitionedMatrices.keySet().iterator().next();

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
index 88512b6..5cd5daf 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
@@ -563,61 +563,42 @@ public class MRJobConfiguration
 		
 		//set transpose sparse column vector
 		job.setBoolean(PARTITIONING_OUTPUT_KEEP_INDEXES_CONFIG, keepIndexes);
-				
 	}
 	
-	public static long getPartitioningNumRows( JobConf job )
-	{
-		return Long.parseLong(job.get(PARTITIONING_INPUT_MATRIX_NUM_ROW_CONFIG));
+	public static MatrixCharacteristics getPartitionedMatrixSize(JobConf job) {
+		return new MatrixCharacteristics(
+			Long.parseLong(job.get(PARTITIONING_INPUT_MATRIX_NUM_ROW_CONFIG)),
+			Long.parseLong(job.get(PARTITIONING_INPUT_MATRIX_NUM_COLUMN_CONFIG)),
+			Integer.parseInt(job.get(PARTITIONING_INPUT_BLOCK_NUM_ROW_CONFIG)),
+			Integer.parseInt(job.get(PARTITIONING_INPUT_BLOCK_NUM_COLUMN_CONFIG)));
 	}
 	
-	public static long getPartitioningNumCols( JobConf job )
-	{
-		return Long.parseLong(job.get(PARTITIONING_INPUT_MATRIX_NUM_COLUMN_CONFIG));
-	}
 	
-	public static void setPartitioningBlockNumRows( JobConf job, int brlen )
-	{
+	public static void setPartitioningBlockNumRows( JobConf job, int brlen ) {
 		job.set(PARTITIONING_INPUT_BLOCK_NUM_ROW_CONFIG, String.valueOf(brlen));
 	}
 	
-	public static int getPartitioningBlockNumRows( JobConf job )
-	{
-		return Integer.parseInt(job.get(PARTITIONING_INPUT_BLOCK_NUM_ROW_CONFIG));
-	}
-
-	public static void setPartitioningBlockNumCols( JobConf job, int bclen )
-	{
+	public static void setPartitioningBlockNumCols( JobConf job, int bclen ) {
 		job.set(PARTITIONING_INPUT_BLOCK_NUM_COLUMN_CONFIG,String.valueOf(bclen));
 	}
 	
-	public static int getPartitioningBlockNumCols( JobConf job )
-	{
-		return Integer.parseInt(job.get(PARTITIONING_INPUT_BLOCK_NUM_COLUMN_CONFIG));
-	}
-	
-	public static InputInfo getPartitioningInputInfo( JobConf job )
-	{
+	public static InputInfo getPartitioningInputInfo( JobConf job ) {
 		return InputInfo.stringToInputInfo(job.get(PARTITIONING_INPUT_INFO_CONFIG));
 	}
 	
-	public static OutputInfo getPartitioningOutputInfo( JobConf job )
-	{
+	public static OutputInfo getPartitioningOutputInfo( JobConf job ) {
 		return OutputInfo.stringToOutputInfo(job.get(PARTITIONING_OUTPUT_INFO_CONFIG));
 	}
-
-	public static void setPartitioningFormat( JobConf job, PDataPartitionFormat dpf )
-	{
+	
+	public static void setPartitioningFormat( JobConf job, PDataPartitionFormat dpf ) {
 		job.set(PARTITIONING_OUTPUT_FORMAT_CONFIG, dpf.toString());
 	}
 	
-	public static PDataPartitionFormat getPartitioningFormat( JobConf job )
-	{
+	public static PDataPartitionFormat getPartitioningFormat( JobConf job )	{
 		return PDataPartitionFormat.valueOf(job.get(PARTITIONING_OUTPUT_FORMAT_CONFIG));
 	}
 	
-	public static int getPartitioningSizeN( JobConf job )
-	{
+	public static int getPartitioningSizeN( JobConf job ) {
 		return Integer.parseInt(job.get(PARTITIONING_OUTPUT_N_CONFIG));
 	}
 	

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDataPartitionExecuteTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDataPartitionExecuteTest.java b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDataPartitionExecuteTest.java
new file mode 100644
index 0000000..80c7568
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDataPartitionExecuteTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.parfor;
+
+import java.util.HashMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.lops.LopProperties.ExecType;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+public class ParForDataPartitionExecuteTest extends AutomatedTestBase 
+{
+	private final static String TEST_NAME = "DataPartitionExecute";
+	private final static String TEST_DIR = "functions/parfor/";
+	private final static String TEST_CLASS_DIR = TEST_DIR + ParForDataPartitionExecuteTest.class.getSimpleName() + "/";
+	private final static double eps = 1e-10;
+	
+	private final static int dim1 = 2001;
+	private final static int dim2 = 101;
+	private final static double sparsity1 = 0.7;
+	private final static double sparsity2 = 0.3d;
+	
+	@Override
+	public void setUp() {
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[]{"R"}));
+	}
+	
+	@Test
+	public void testFusedDataPartitionExecuteRowDenseMR() {
+		runFusedDataPartitionExecuteTest(false, true, ExecType.MR);
+	}
+	
+	@Test
+	public void testFusedDataPartitionExecuteColDenseMR() {
+		runFusedDataPartitionExecuteTest(false, false, ExecType.MR);
+	}
+	
+	@Test
+	public void testFusedDataPartitionExecuteRowSparseMR() {
+		runFusedDataPartitionExecuteTest(true, true, ExecType.MR);
+	}
+	
+	@Test
+	public void testFusedDataPartitionExecuteColSparseMR() {
+		runFusedDataPartitionExecuteTest(true, false, ExecType.MR);
+	}
+	
+	@Test
+	public void testFusedDataPartitionExecuteRowDenseSpark() {
+		runFusedDataPartitionExecuteTest(false, true, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testFusedDataPartitionExecuteColDenseSpark() {
+		runFusedDataPartitionExecuteTest(false, false, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testFusedDataPartitionExecuteRowSparseSpark() {
+		runFusedDataPartitionExecuteTest(true, true, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testFusedDataPartitionExecuteColSparseSpark() {
+		runFusedDataPartitionExecuteTest(true, false, ExecType.SPARK);
+	}
+	
+	private void runFusedDataPartitionExecuteTest(boolean sparse, boolean row, ExecType et)
+	{
+		RUNTIME_PLATFORM platformOld = rtplatform;
+		switch( et ){
+			case MR: rtplatform = RUNTIME_PLATFORM.HYBRID; break;
+			case SPARK: rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK; break;
+			default: throw new RuntimeException("Unsupported exec type: "+et.name());
+		}
+		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		if( et == ExecType.SPARK )
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+		
+		//modify memory budget to trigger fused datapartition-execute
+		long oldmem = InfrastructureAnalyzer.getLocalMaxMemory();
+		InfrastructureAnalyzer.setLocalMaxMemory(1*1024*1024); //1MB
+		
+		try
+		{
+			int rows = row ? dim2 : dim1;
+			int cols = row ? dim1 : dim2;
+			
+			TestConfiguration config = getTestConfiguration(TEST_NAME);
+			config.addVariable("rows", rows);
+			config.addVariable("cols", cols);
+			loadTestConfiguration(config);
+			
+			String HOME = SCRIPT_DIR + TEST_DIR;
+			fullDMLScriptName = HOME + TEST_NAME + ".dml";
+			programArgs = new String[]{"-stats", "-args", input("X"), 
+				String.valueOf(et == ExecType.SPARK).toUpperCase(),
+				String.valueOf(row).toUpperCase(), output("R") };
+			
+			fullRScriptName = HOME + TEST_NAME + ".R";
+			rCmd = "Rscript" + " " + fullRScriptName + " " + inputDir() 
+				+ " " + String.valueOf(row).toUpperCase() + " " + expectedDir();
+			
+			//generate input data
+			double sparsity = sparse ? sparsity2 : sparsity1;
+			double[][] X = getRandomMatrix(rows, cols, 0, 1, sparsity, 7);
+			writeInputMatrixWithMTD("X", X, true);
+			
+			//run test case
+			runTest(true, false, null, -1);
+			runRScript(true);
+			
+			//compare matrices
+			HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R");
+			HashMap<CellIndex, Double> rfile  = readRMatrixFromFS("R");
+			TestUtils.compareMatrices(dmlfile, rfile, eps, "DML", "R");
+			
+			//check for compiled datapartition-execute
+			Assert.assertTrue(heavyHittersContainsSubString(
+				(et == ExecType.SPARK) ? "ParFor-DPESP" : "MR-Job_ParFor-DPEMR"));
+		}
+		finally {
+			rtplatform = platformOld;
+			DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+			InfrastructureAnalyzer.setLocalMaxMemory(oldmem); //1MB
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/test/scripts/functions/parfor/DataPartitionExecute.R
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/parfor/DataPartitionExecute.R b/src/test/scripts/functions/parfor/DataPartitionExecute.R
new file mode 100644
index 0000000..8534606
--- /dev/null
+++ b/src/test/scripts/functions/parfor/DataPartitionExecute.R
@@ -0,0 +1,41 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+args <- commandArgs(TRUE)
+options(digits=22)
+
+library("Matrix")
+
+X <- as.matrix(readMM(paste(args[1], "X.mtx", sep="")))
+R = matrix(0, nrow(X), ncol(X)); 
+
+if( as.logical(args[2]) ) {
+   for(i in 1:nrow(X)) {
+      R[i,] = pmin(X[i,], sum(X[i,]));
+   }
+} else {
+   for(i in 1:ncol(X)) {
+      R[,i] = pmin(X[,i], sum(X[,i]));
+   }
+}
+
+writeMM(as(R, "CsparseMatrix"), paste(args[3], "R", sep=""));

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/test/scripts/functions/parfor/DataPartitionExecute.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/parfor/DataPartitionExecute.dml b/src/test/scripts/functions/parfor/DataPartitionExecute.dml
new file mode 100644
index 0000000..0fd2299
--- /dev/null
+++ b/src/test/scripts/functions/parfor/DataPartitionExecute.dml
@@ -0,0 +1,42 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = read($1);
+R = matrix(0, nrow(X), ncol(X));
+
+if( $2 ) {
+  if( $3 )
+     parfor(i in 1:nrow(X), opt=CONSTRAINED, mode=REMOTE_SPARK_DP, datapartitioner=REMOTE_SPARK)
+        R[i,] = min(X[i,], sum(X[i,]));
+  else
+     parfor(i in 1:ncol(X), opt=CONSTRAINED, mode=REMOTE_SPARK_DP, datapartitioner=REMOTE_SPARK)
+        R[,i] = min(X[,i], sum(X[,i]));
+}
+else {
+  if( $3 )
+     parfor(i in 1:nrow(X), opt=CONSTRAINED, mode=REMOTE_MR_DP, datapartitioner=REMOTE_MR)
+        R[i,] = min(X[i,], sum(X[i,]));
+  else
+     parfor(i in 1:ncol(X), opt=CONSTRAINED, mode=REMOTE_MR_DP, datapartitioner=REMOTE_MR)
+        R[,i] = min(X[,i], sum(X[,i]));
+}
+
+write(R, $4);

http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
----------------------------------------------------------------------
diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
index 976b23f..c898a7a 100644
--- a/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
+++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
@@ -30,6 +30,7 @@ import org.junit.runners.Suite;
 	ParForAdversarialLiteralsTest.class,
 	ParForBlockwiseDataPartitioningTest.class,
 	ParForColwiseDataPartitioningTest.class,
+	ParForDataPartitionExecuteTest.class,
 	ParForDataPartitionLeftIndexingTest.class,
 	ParForDependencyAnalysisTest.class,
 	ParForFunctionSerializationTest.class,