You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/09/01 19:12:10 UTC
[2/3] systemml git commit: [SYSTEMML-1877] Fix setup parfor
data-partition-execute (univar stats)
[SYSTEMML-1877] Fix setup parfor data-partition-execute (univar stats)
This patch fixes a setup issue for column-wise partitioning in the fused
parfor data-partition-execute spark job. To avoid such issues in the
future, this also includes a refactoring to centralize the analysis of
partition sizes as well as test cases that force the fused
data-partition-execute job for both MR and Spark.
Additionally, this patch fixes the constrained parfor optimizer to
consider the fused data-partition-execute job (so far, this job was
never selected even if both data partitioning and execution were forced
to its remote execution types).
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/9178a954
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/9178a954
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/9178a954
Branch: refs/heads/master
Commit: 9178a95495227e978dcc1049f04431ddbd2c4fc5
Parents: ec35215
Author: Matthias Boehm <mb...@gmail.com>
Authored: Thu Aug 31 22:39:32 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Sep 1 12:13:25 2017 -0700
----------------------------------------------------------------------
.../controlprogram/ParForProgramBlock.java | 29 ++++
.../parfor/DataPartitionerRemoteMapper.java | 28 ++--
.../parfor/RemoteDPParForSparkWorker.java | 21 +--
.../parfor/RemoteDPParWorkerReducer.java | 33 ++--
.../RemoteParForColocatedNLineInputFormat.java | 20 +--
.../parfor/opt/OptimizerConstrained.java | 21 ++-
.../parfor/opt/OptimizerRuleBased.java | 3 +-
.../matrix/mapred/MRJobConfiguration.java | 47 ++----
.../parfor/ParForDataPartitionExecuteTest.java | 153 +++++++++++++++++++
.../functions/parfor/DataPartitionExecute.R | 41 +++++
.../functions/parfor/DataPartitionExecute.dml | 42 +++++
.../functions/parfor/ZPackageSuite.java | 1 +
12 files changed, 330 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index 3a9bf51..ce8bbed 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -215,6 +215,11 @@ public class ParForProgramBlock extends ForProgramBlock
return _dpf == PDataPartitionFormat.COLUMN_BLOCK_WISE_N
|| _dpf == PDataPartitionFormat.ROW_BLOCK_WISE_N;
}
+ public boolean isRowwise() {
+ return _dpf == PDataPartitionFormat.ROW_WISE
+ || _dpf == PDataPartitionFormat.ROW_BLOCK_WISE
+ || _dpf == PDataPartitionFormat.ROW_BLOCK_WISE_N;
+ }
public long getNumParts(MatrixCharacteristics mc) {
switch( _dpf ) {
case ROW_WISE: return mc.getRows();
@@ -227,6 +232,30 @@ public class ParForProgramBlock extends ForProgramBlock
throw new RuntimeException("Unsupported partition format: "+_dpf);
}
}
+ public long getNumRows(MatrixCharacteristics mc) {
+ switch( _dpf ) {
+ case ROW_WISE: return 1;
+ case ROW_BLOCK_WISE: return mc.getRowsPerBlock();
+ case ROW_BLOCK_WISE_N: return _N;
+ case COLUMN_WISE: return mc.getRows();
+ case COLUMN_BLOCK_WISE: return mc.getRows();
+ case COLUMN_BLOCK_WISE_N: return mc.getRows();
+ default:
+ throw new RuntimeException("Unsupported partition format: "+_dpf);
+ }
+ }
+ public long getNumColumns(MatrixCharacteristics mc) {
+ switch( _dpf ) {
+ case ROW_WISE: return mc.getCols();
+ case ROW_BLOCK_WISE: return mc.getCols();
+ case ROW_BLOCK_WISE_N: return mc.getCols();
+ case COLUMN_WISE: return 1;
+ case COLUMN_BLOCK_WISE: return mc.getColsPerBlock();
+ case COLUMN_BLOCK_WISE_N: return _N;
+ default:
+ throw new RuntimeException("Unsupported partition format: "+_dpf);
+ }
+ }
}
public enum PDataPartitioner {
http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java
index 6fcfc1a..e26201e 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.Reporter;
import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.data.IJV;
import org.apache.sysml.runtime.matrix.data.InputInfo;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -48,15 +49,11 @@ import org.apache.sysml.runtime.util.MapReduceTool;
*/
public class DataPartitionerRemoteMapper
implements Mapper<Writable, Writable, Writable, Writable>
-{
-
+{
private DataPartitionerMapper _mapper = null;
- public DataPartitionerRemoteMapper( )
- {
-
- }
-
+ public DataPartitionerRemoteMapper( ) { }
+
@Override
public void map(Writable key, Writable value, OutputCollector<Writable, Writable> out, Reporter reporter)
throws IOException
@@ -67,10 +64,7 @@ public class DataPartitionerRemoteMapper
@Override
public void configure(JobConf job)
{
- long rlen = MRJobConfiguration.getPartitioningNumRows( job );
- long clen = MRJobConfiguration.getPartitioningNumCols( job );
- int brlen = MRJobConfiguration.getPartitioningBlockNumRows( job );
- int bclen = MRJobConfiguration.getPartitioningBlockNumCols( job );
+ MatrixCharacteristics mc = MRJobConfiguration.getPartitionedMatrixSize(job);
InputInfo ii = MRJobConfiguration.getPartitioningInputInfo( job );
OutputInfo oi = MRJobConfiguration.getPartitioningOutputInfo( job );
PDataPartitionFormat pdf = MRJobConfiguration.getPartitioningFormat( job );
@@ -78,17 +72,21 @@ public class DataPartitionerRemoteMapper
boolean keepIndexes = MRJobConfiguration.getPartitioningIndexFlag( job );
if( ii == InputInfo.TextCellInputInfo )
- _mapper = new DataPartitionerMapperTextcell(rlen, clen, brlen, bclen, pdf, n);
+ _mapper = new DataPartitionerMapperTextcell(mc.getRows(), mc.getCols(),
+ mc.getRowsPerBlock(), mc.getColsPerBlock(), pdf, n);
else if( ii == InputInfo.BinaryCellInputInfo )
- _mapper = new DataPartitionerMapperBinarycell(rlen, clen, brlen, bclen, pdf, n);
+ _mapper = new DataPartitionerMapperBinarycell(mc.getRows(), mc.getCols(),
+ mc.getRowsPerBlock(), mc.getColsPerBlock(), pdf, n);
else if( ii == InputInfo.BinaryBlockInputInfo )
{
if( oi == OutputInfo.BinaryBlockOutputInfo )
- _mapper = new DataPartitionerMapperBinaryblock(rlen, clen, brlen, bclen, pdf, n, keepIndexes);
+ _mapper = new DataPartitionerMapperBinaryblock(mc.getRows(), mc.getCols(),
+ mc.getRowsPerBlock(), mc.getColsPerBlock(), pdf, n, keepIndexes);
else if( oi == OutputInfo.BinaryCellOutputInfo )
{
boolean outputEmpty = MRJobConfiguration.getProgramBlocks(job)!=null; //fused parfor
- _mapper = new DataPartitionerMapperBinaryblock2Binarycell(job, rlen, clen, brlen, bclen, pdf, n, keepIndexes, outputEmpty);
+ _mapper = new DataPartitionerMapperBinaryblock2Binarycell(job, mc.getRows(), mc.getCols(),
+ mc.getRowsPerBlock(), mc.getColsPerBlock(), pdf, n, keepIndexes, outputEmpty);
}
else
throw new RuntimeException("Partitioning from '"+ii+"' to '"+oi+"' not supported");
http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index d20e7f3..dbc3fbf 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -87,18 +87,8 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
_aIters = aiters;
//setup matrix block partition meta data
- switch( dpf._dpf ) {
- case ROW_WISE:
- _rlen = (int)mc.getRows(); _clen = 1; break;
- case ROW_BLOCK_WISE_N:
- _rlen = dpf._N; _clen = (int)mc.getCols(); break;
- case COLUMN_BLOCK_WISE:
- _rlen = 1; _clen = (int)mc.getCols(); break;
- case COLUMN_BLOCK_WISE_N:
- _rlen = (int)mc.getRows(); _clen = dpf._N; break;
- default:
- throw new RuntimeException("Unsupported partition format: "+dpf._dpf.name());
- }
+ _rlen = (int)dpf.getNumRows(mc);
+ _clen = (int)dpf.getNumColumns(mc);
_brlen = mc.getRowsPerBlock();
_bclen = mc.getColsPerBlock();
_tSparseCol = tSparseCol;
@@ -237,8 +227,8 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
int col_offset = (int)(pval.indexes.getColumnIndex()-1)*_bclen;
if( !partition.isInSparseFormat() ) //DENSE
partition.copy( row_offset, row_offset+pval.block.getNumRows()-1,
- col_offset, col_offset+pval.block.getNumColumns()-1,
- pval.block, false );
+ col_offset, col_offset+pval.block.getNumColumns()-1,
+ pval.block, false );
else //SPARSE
partition.appendToSparse(pval.block, row_offset, col_offset);
lnnz += pval.block.getNonZeros();
@@ -250,8 +240,7 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
partition.setNonZeros(lnnz);
partition.examSparsity();
}
- catch(DMLRuntimeException ex)
- {
+ catch(DMLRuntimeException ex) {
throw new IOException(ex);
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
index 3f26945..d022ac8 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
@@ -32,6 +32,7 @@ import org.apache.sysml.api.DMLScript;
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
+import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics;
import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
@@ -42,6 +43,7 @@ import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
import org.apache.sysml.runtime.instructions.cp.IntObject;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.OutputInfo;
import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
@@ -66,18 +68,15 @@ public class RemoteDPParWorkerReducer extends ParWorker
//reuse matrix partition
private MatrixBlock _partition = null;
private boolean _tSparseCol = false;
-
- //MR ParWorker attributes
- protected String _stringID = null;
+
+ //MR ParWorker attributes
+ protected String _stringID = null;
//cached collector/reporter
protected OutputCollector<Writable, Writable> _out = null;
protected Reporter _report = null;
- public RemoteDPParWorkerReducer()
- {
-
- }
+ public RemoteDPParWorkerReducer() { }
@Override
public void reduce(LongWritable key, Iterator<Writable> valueList, OutputCollector<Writable, Writable> out, Reporter reporter)
@@ -107,8 +106,7 @@ public class RemoteDPParWorkerReducer extends ParWorker
//execute program
executeTask( lTask );
}
- catch(Exception ex)
- {
+ catch(Exception ex) {
throw new IOException("ParFOR: Failed to execute task.",ex);
}
@@ -120,18 +118,15 @@ public class RemoteDPParWorkerReducer extends ParWorker
public void configure(JobConf job)
{
//Step 1: configure data partitioning information
- _rlen = (int)MRJobConfiguration.getPartitioningNumRows( job );
- _clen = (int)MRJobConfiguration.getPartitioningNumCols( job );
- _brlen = MRJobConfiguration.getPartitioningBlockNumRows( job );
- _bclen = MRJobConfiguration.getPartitioningBlockNumCols( job );
+ _dpf = MRJobConfiguration.getPartitioningFormat( job );
+ MatrixCharacteristics mc = MRJobConfiguration.getPartitionedMatrixSize(job);
+ PartitionFormat pf = new PartitionFormat(_dpf, MRJobConfiguration.getPartitioningSizeN(job));
+ _rlen = (int)pf.getNumRows(mc);
+ _clen = (int)pf.getNumColumns(mc);
+ _brlen = mc.getRowsPerBlock();
+ _bclen = mc.getColsPerBlock();
_iterVar = MRJobConfiguration.getPartitioningItervar( job );
_inputVar = MRJobConfiguration.getPartitioningMatrixvar( job );
- _dpf = MRJobConfiguration.getPartitioningFormat( job );
- switch( _dpf ) { //create matrix partition for reuse
- case ROW_WISE: _rlen = 1; break;
- case COLUMN_WISE: _clen = 1; break;
- default: throw new RuntimeException("Partition format not yet supported in fused partition-execute: "+_dpf);
- }
_info = MRJobConfiguration.getPartitioningOutputInfo( job );
_tSparseCol = MRJobConfiguration.getPartitioningTransposedCol( job );
if( _tSparseCol )
http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java
index d5cc4bc..a2a231d 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.NLineInputFormat;
import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
+import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
/**
@@ -37,29 +39,21 @@ import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
*/
public class RemoteParForColocatedNLineInputFormat extends NLineInputFormat
{
-
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException
- {
+ {
InputSplit[] tmp = super.getSplits(job, numSplits);
//get partitioning information
+ MatrixCharacteristics mc = MRJobConfiguration.getPartitionedMatrixSize(job);
PDataPartitionFormat dpf = MRJobConfiguration.getPartitioningFormat(job);
- int blen = -1;
- switch( dpf ) {
- case ROW_WISE: blen = 1; break;
- case ROW_BLOCK_WISE: blen = MRJobConfiguration.getPartitioningBlockNumRows(job); break;
- case COLUMN_WISE: blen = 1; break;
- case COLUMN_BLOCK_WISE: blen = MRJobConfiguration.getPartitioningBlockNumCols(job); break;
- default:
- //do nothing
- }
+ PartitionFormat pf = new PartitionFormat(dpf, -1);
+ int blen = (int) (pf.isRowwise() ? pf.getNumRows(mc) : pf.getNumColumns(mc));
String fname = MRJobConfiguration.getPartitioningFilename(job);
//create wrapper splits
InputSplit[] ret = new InputSplit[ tmp.length ];
- for( int i=0; i<tmp.length; i++ )
- {
+ for( int i=0; i<tmp.length; i++ ) {
//check for robustness of subsequent cast
if( tmp[i] instanceof FileSplit )
ret[i] = new RemoteParForColocatedFileSplit( (FileSplit) tmp[i], fname, blen );
http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
index 9e78c2a..1ca5631 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
@@ -231,19 +231,18 @@ public class OptimizerConstrained extends OptimizerRuleBased
protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap vars, HashMap<String,PartitionFormat> partitionedMatrices, double thetaM)
throws DMLRuntimeException
{
- boolean blockwise = false;
-
+ //call rewrite first to obtain partitioning information
+ String initPlan = n.getParam(ParamType.DATA_PARTITIONER);
+ boolean blockwise = super.rewriteSetDataPartitioner(n, vars, partitionedMatrices, thetaM);
+
// constraint awareness
- if( !n.getParam(ParamType.DATA_PARTITIONER).equals(PDataPartitioner.UNSPECIFIED.toString()) )
- {
- Object[] o = OptTreeConverter.getAbstractPlanMapping().getMappedProg(n.getID());
- ParForProgramBlock pfpb = (ParForProgramBlock) o[1];
- pfpb.setDataPartitioner(PDataPartitioner.valueOf(n.getParam(ParamType.DATA_PARTITIONER)));
- LOG.debug(getOptMode()+" OPT: forced 'set data partitioner' - result="+n.getParam(ParamType.DATA_PARTITIONER) );
+ if( !initPlan.equals(PDataPartitioner.UNSPECIFIED.name()) ) {
+ ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
+ .getAbstractPlanMapping().getMappedProg(n.getID())[1];
+ pfpb.setDataPartitioner(PDataPartitioner.valueOf(initPlan));
+ LOG.debug(getOptMode()+" OPT: forced 'set data partitioner' - result=" + initPlan );
}
- else
- super.rewriteSetDataPartitioner(n, vars, partitionedMatrices, thetaM);
-
+
return blockwise;
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index 154109a..9456703 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -100,7 +100,6 @@ import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyze
import org.apache.sysml.runtime.instructions.Instruction;
import org.apache.sysml.runtime.instructions.cp.Data;
import org.apache.sysml.runtime.instructions.cp.FunctionCallCPInstruction;
-import org.apache.sysml.runtime.instructions.gpu.context.GPUContext;
import org.apache.sysml.runtime.instructions.gpu.context.GPUContextPool;
import org.apache.sysml.runtime.instructions.spark.data.RDDObject;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
@@ -1505,7 +1504,7 @@ public class OptimizerRuleBased extends Optimizer
&& partitionedMatrices.size()==1 ) //only one partitioned matrix
{
ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
- .getAbstractPlanMapping().getMappedProg(pn.getID())[1];
+ .getAbstractPlanMapping().getMappedProg(pn.getID())[1];
//partitioned matrix
String moVarname = partitionedMatrices.keySet().iterator().next();
http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
index 88512b6..5cd5daf 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
@@ -563,61 +563,42 @@ public class MRJobConfiguration
//set transpose sparse column vector
job.setBoolean(PARTITIONING_OUTPUT_KEEP_INDEXES_CONFIG, keepIndexes);
-
}
- public static long getPartitioningNumRows( JobConf job )
- {
- return Long.parseLong(job.get(PARTITIONING_INPUT_MATRIX_NUM_ROW_CONFIG));
+ public static MatrixCharacteristics getPartitionedMatrixSize(JobConf job) {
+ return new MatrixCharacteristics(
+ Long.parseLong(job.get(PARTITIONING_INPUT_MATRIX_NUM_ROW_CONFIG)),
+ Long.parseLong(job.get(PARTITIONING_INPUT_MATRIX_NUM_COLUMN_CONFIG)),
+ Integer.parseInt(job.get(PARTITIONING_INPUT_BLOCK_NUM_ROW_CONFIG)),
+ Integer.parseInt(job.get(PARTITIONING_INPUT_BLOCK_NUM_COLUMN_CONFIG)));
}
- public static long getPartitioningNumCols( JobConf job )
- {
- return Long.parseLong(job.get(PARTITIONING_INPUT_MATRIX_NUM_COLUMN_CONFIG));
- }
- public static void setPartitioningBlockNumRows( JobConf job, int brlen )
- {
+ public static void setPartitioningBlockNumRows( JobConf job, int brlen ) {
job.set(PARTITIONING_INPUT_BLOCK_NUM_ROW_CONFIG, String.valueOf(brlen));
}
- public static int getPartitioningBlockNumRows( JobConf job )
- {
- return Integer.parseInt(job.get(PARTITIONING_INPUT_BLOCK_NUM_ROW_CONFIG));
- }
-
- public static void setPartitioningBlockNumCols( JobConf job, int bclen )
- {
+ public static void setPartitioningBlockNumCols( JobConf job, int bclen ) {
job.set(PARTITIONING_INPUT_BLOCK_NUM_COLUMN_CONFIG,String.valueOf(bclen));
}
- public static int getPartitioningBlockNumCols( JobConf job )
- {
- return Integer.parseInt(job.get(PARTITIONING_INPUT_BLOCK_NUM_COLUMN_CONFIG));
- }
-
- public static InputInfo getPartitioningInputInfo( JobConf job )
- {
+ public static InputInfo getPartitioningInputInfo( JobConf job ) {
return InputInfo.stringToInputInfo(job.get(PARTITIONING_INPUT_INFO_CONFIG));
}
- public static OutputInfo getPartitioningOutputInfo( JobConf job )
- {
+ public static OutputInfo getPartitioningOutputInfo( JobConf job ) {
return OutputInfo.stringToOutputInfo(job.get(PARTITIONING_OUTPUT_INFO_CONFIG));
}
-
- public static void setPartitioningFormat( JobConf job, PDataPartitionFormat dpf )
- {
+
+ public static void setPartitioningFormat( JobConf job, PDataPartitionFormat dpf ) {
job.set(PARTITIONING_OUTPUT_FORMAT_CONFIG, dpf.toString());
}
- public static PDataPartitionFormat getPartitioningFormat( JobConf job )
- {
+ public static PDataPartitionFormat getPartitioningFormat( JobConf job ) {
return PDataPartitionFormat.valueOf(job.get(PARTITIONING_OUTPUT_FORMAT_CONFIG));
}
- public static int getPartitioningSizeN( JobConf job )
- {
+ public static int getPartitioningSizeN( JobConf job ) {
return Integer.parseInt(job.get(PARTITIONING_OUTPUT_N_CONFIG));
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDataPartitionExecuteTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDataPartitionExecuteTest.java b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDataPartitionExecuteTest.java
new file mode 100644
index 0000000..80c7568
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDataPartitionExecuteTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.parfor;
+
+import java.util.HashMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.lops.LopProperties.ExecType;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+public class ParForDataPartitionExecuteTest extends AutomatedTestBase
+{
+ private final static String TEST_NAME = "DataPartitionExecute";
+ private final static String TEST_DIR = "functions/parfor/";
+ private final static String TEST_CLASS_DIR = TEST_DIR + ParForDataPartitionExecuteTest.class.getSimpleName() + "/";
+ private final static double eps = 1e-10;
+
+ private final static int dim1 = 2001;
+ private final static int dim2 = 101;
+ private final static double sparsity1 = 0.7;
+ private final static double sparsity2 = 0.3d;
+
+ @Override
+ public void setUp() {
+ addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[]{"R"}));
+ }
+
+ @Test
+ public void testFusedDataPartitionExecuteRowDenseMR() {
+ runFusedDataPartitionExecuteTest(false, true, ExecType.MR);
+ }
+
+ @Test
+ public void testFusedDataPartitionExecuteColDenseMR() {
+ runFusedDataPartitionExecuteTest(false, false, ExecType.MR);
+ }
+
+ @Test
+ public void testFusedDataPartitionExecuteRowSparseMR() {
+ runFusedDataPartitionExecuteTest(true, true, ExecType.MR);
+ }
+
+ @Test
+ public void testFusedDataPartitionExecuteColSparseMR() {
+ runFusedDataPartitionExecuteTest(true, false, ExecType.MR);
+ }
+
+ @Test
+ public void testFusedDataPartitionExecuteRowDenseSpark() {
+ runFusedDataPartitionExecuteTest(false, true, ExecType.SPARK);
+ }
+
+ @Test
+ public void testFusedDataPartitionExecuteColDenseSpark() {
+ runFusedDataPartitionExecuteTest(false, false, ExecType.SPARK);
+ }
+
+ @Test
+ public void testFusedDataPartitionExecuteRowSparseSpark() {
+ runFusedDataPartitionExecuteTest(true, true, ExecType.SPARK);
+ }
+
+ @Test
+ public void testFusedDataPartitionExecuteColSparseSpark() {
+ runFusedDataPartitionExecuteTest(true, false, ExecType.SPARK);
+ }
+
+ private void runFusedDataPartitionExecuteTest(boolean sparse, boolean row, ExecType et)
+ {
+ RUNTIME_PLATFORM platformOld = rtplatform;
+ switch( et ){
+ case MR: rtplatform = RUNTIME_PLATFORM.HYBRID; break;
+ case SPARK: rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK; break;
+ default: throw new RuntimeException("Unsupported exec type: "+et.name());
+ }
+ boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+ if( et == ExecType.SPARK )
+ DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
+ //modify memory budget to trigger fused datapartition-execute
+ long oldmem = InfrastructureAnalyzer.getLocalMaxMemory();
+ InfrastructureAnalyzer.setLocalMaxMemory(1*1024*1024); //1MB
+
+ try
+ {
+ int rows = row ? dim2 : dim1;
+ int cols = row ? dim1 : dim2;
+
+ TestConfiguration config = getTestConfiguration(TEST_NAME);
+ config.addVariable("rows", rows);
+ config.addVariable("cols", cols);
+ loadTestConfiguration(config);
+
+ String HOME = SCRIPT_DIR + TEST_DIR;
+ fullDMLScriptName = HOME + TEST_NAME + ".dml";
+ programArgs = new String[]{"-stats", "-args", input("X"),
+ String.valueOf(et == ExecType.SPARK).toUpperCase(),
+ String.valueOf(row).toUpperCase(), output("R") };
+
+ fullRScriptName = HOME + TEST_NAME + ".R";
+ rCmd = "Rscript" + " " + fullRScriptName + " " + inputDir()
+ + " " + String.valueOf(row).toUpperCase() + " " + expectedDir();
+
+ //generate input data
+ double sparsity = sparse ? sparsity2 : sparsity1;
+ double[][] X = getRandomMatrix(rows, cols, 0, 1, sparsity, 7);
+ writeInputMatrixWithMTD("X", X, true);
+
+ //run test case
+ runTest(true, false, null, -1);
+ runRScript(true);
+
+ //compare matrices
+ HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R");
+ HashMap<CellIndex, Double> rfile = readRMatrixFromFS("R");
+ TestUtils.compareMatrices(dmlfile, rfile, eps, "DML", "R");
+
+ //check for compiled datapartition-execute
+ Assert.assertTrue(heavyHittersContainsSubString(
+ (et == ExecType.SPARK) ? "ParFor-DPESP" : "MR-Job_ParFor-DPEMR"));
+ }
+ finally {
+ rtplatform = platformOld;
+ DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+ InfrastructureAnalyzer.setLocalMaxMemory(oldmem); //1MB
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/test/scripts/functions/parfor/DataPartitionExecute.R
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/parfor/DataPartitionExecute.R b/src/test/scripts/functions/parfor/DataPartitionExecute.R
new file mode 100644
index 0000000..8534606
--- /dev/null
+++ b/src/test/scripts/functions/parfor/DataPartitionExecute.R
@@ -0,0 +1,41 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+args <- commandArgs(TRUE)
+options(digits=22)
+
+library("Matrix")
+
+X <- as.matrix(readMM(paste(args[1], "X.mtx", sep="")))
+R = matrix(0, nrow(X), ncol(X));
+
+if( as.logical(args[2]) ) {
+ for(i in 1:nrow(X)) {
+ R[i,] = pmin(X[i,], sum(X[i,]));
+ }
+} else {
+ for(i in 1:ncol(X)) {
+ R[,i] = pmin(X[,i], sum(X[,i]));
+ }
+}
+
+writeMM(as(R, "CsparseMatrix"), paste(args[3], "R", sep=""));
http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/test/scripts/functions/parfor/DataPartitionExecute.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/parfor/DataPartitionExecute.dml b/src/test/scripts/functions/parfor/DataPartitionExecute.dml
new file mode 100644
index 0000000..0fd2299
--- /dev/null
+++ b/src/test/scripts/functions/parfor/DataPartitionExecute.dml
@@ -0,0 +1,42 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = read($1);
+R = matrix(0, nrow(X), ncol(X));
+
+if( $2 ) {
+ if( $3 )
+ parfor(i in 1:nrow(X), opt=CONSTRAINED, mode=REMOTE_SPARK_DP, datapartitioner=REMOTE_SPARK)
+ R[i,] = min(X[i,], sum(X[i,]));
+ else
+ parfor(i in 1:ncol(X), opt=CONSTRAINED, mode=REMOTE_SPARK_DP, datapartitioner=REMOTE_SPARK)
+ R[,i] = min(X[,i], sum(X[,i]));
+}
+else {
+ if( $3 )
+ parfor(i in 1:nrow(X), opt=CONSTRAINED, mode=REMOTE_MR_DP, datapartitioner=REMOTE_MR)
+ R[i,] = min(X[i,], sum(X[i,]));
+ else
+ parfor(i in 1:ncol(X), opt=CONSTRAINED, mode=REMOTE_MR_DP, datapartitioner=REMOTE_MR)
+ R[,i] = min(X[,i], sum(X[,i]));
+}
+
+write(R, $4);
http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
----------------------------------------------------------------------
diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
index 976b23f..c898a7a 100644
--- a/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
+++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
@@ -30,6 +30,7 @@ import org.junit.runners.Suite;
ParForAdversarialLiteralsTest.class,
ParForBlockwiseDataPartitioningTest.class,
ParForColwiseDataPartitioningTest.class,
+ ParForDataPartitionExecuteTest.class,
ParForDataPartitionLeftIndexingTest.class,
ParForDependencyAnalysisTest.class,
ParForFunctionSerializationTest.class,