You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/02/24 20:27:38 UTC
[6/6] incubator-systemml git commit: [SYSTEMML-1336] Improved parfor
optimizer (conditional partitioning)
[SYSTEMML-1336] Improved parfor optimizer (conditional partitioning)
This patch improves the parfor optimizer to consider what-if scenarios
with conditional partitioning to avoid falling back to local parfor
plans with small degree of parallelism (if the data barely fits in the
driver) although we could have applied a fused partition-execute parfor
job.
For example, on perftest 8GB univariate-stats, it improved the
end-to-end runtime (incl spark context creation and I/O) from 781s to
110s.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/2f7fa8d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/2f7fa8d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/2f7fa8d7
Branch: refs/heads/master
Commit: 2f7fa8d73fa9680df283444627209a31c5ef4acd
Parents: 35da413
Author: Matthias Boehm <mb...@gmail.com>
Authored: Thu Feb 23 22:19:24 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Feb 24 12:27:29 2017 -0800
----------------------------------------------------------------------
.../parfor/opt/CostEstimator.java | 26 +++++--
.../controlprogram/parfor/opt/OptNode.java | 2 +
.../parfor/opt/OptimizerConstrained.java | 37 ++++++----
.../parfor/opt/OptimizerRuleBased.java | 75 +++++++++++---------
4 files changed, 90 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2f7fa8d7/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java
index bb3ca88..3fdf8bd 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java
@@ -55,6 +55,8 @@ public abstract class CostEstimator
SPARSE
}
+ protected boolean _inclCondPart = false;
+
/**
* Main leaf node estimation method - to be overwritten by specific cost estimators
*
@@ -88,6 +90,7 @@ public abstract class CostEstimator
*
* @param measure ?
* @param node internal representation of a plan alternative for program blocks and instructions
+ * @param inclCondPart including conditional partitioning
* @return estimate?
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
@@ -97,13 +100,26 @@ public abstract class CostEstimator
return getEstimate(measure, node, null);
}
+ public double getEstimate( TestMeasure measure, OptNode node, boolean inclCondPart )
+ throws DMLRuntimeException
+ {
+ //temporarily change local flag and get estimate
+ boolean oldInclCondPart = _inclCondPart;
+ _inclCondPart = inclCondPart;
+ double val = getEstimate(measure, node, null);
+
+ //reset local flag and return
+ _inclCondPart = oldInclCondPart;
+ return val;
+ }
+
/**
* Main estimation method.
*
- * @param measure ?
- * @param node internal representation of a plan alternative for program blocks and instructions
+ * @param measure estimate type (time or memory)
+ * @param node plan opt tree node
* @param et execution type
- * @return estimate?
+ * @return estimate
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
public double getEstimate( TestMeasure measure, OptNode node, ExecType et )
@@ -113,7 +129,9 @@ public abstract class CostEstimator
if( node.isLeaf() )
{
- if( et != null )
+ if( _inclCondPart && node.getParam(ParamType.DATA_PARTITION_COND_MEM) != null )
+ val = Double.parseDouble(node.getParam(ParamType.DATA_PARTITION_COND_MEM));
+ else if( et != null )
val = getLeafNodeEstimate(measure, node, et); //forced type
else
val = getLeafNodeEstimate(measure, node); //default
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2f7fa8d7/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java
index 7968c6a..26c30d4 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java
@@ -71,6 +71,8 @@ public class OptNode
TASK_SIZE,
DATA_PARTITIONER,
DATA_PARTITION_FORMAT,
+ DATA_PARTITION_COND,
+ DATA_PARTITION_COND_MEM,
RESULT_MERGE,
NUM_ITERATIONS,
RECURSIVE_CALL
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2f7fa8d7/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
index 39e742f..6edcec3 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
@@ -80,7 +80,6 @@ public class OptimizerConstrained extends OptimizerRuleBased
LOG.debug("--- "+getOptMode()+" OPTIMIZER -------");
OptNode pn = plan.getRoot();
- double M0 = -1, M1 = -1, M2 = -1; //memory consumption
//early abort for empty parfor body
if( pn.isLeaf() )
@@ -100,35 +99,45 @@ public class OptimizerConstrained extends OptimizerRuleBased
ExecType oldET = pn.getExecType();
int oldK = pn.getK();
pn.setSerialParFor(); //for basic mem consumption
- M0 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn);
+ double M0a = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn);
pn.setExecType(oldET);
pn.setK(oldK);
- LOG.debug(getOptMode()+" OPT: estimated mem (serial exec) M="+toMB(M0) );
+ LOG.debug(getOptMode()+" OPT: estimated mem (serial exec) M="+toMB(M0a) );
//OPTIMIZE PARFOR PLAN
// rewrite 1: data partitioning (incl. log. recompile RIX)
HashMap<String, PDataPartitionFormat> partitionedMatrices = new HashMap<String,PDataPartitionFormat>();
- rewriteSetDataPartitioner( pn, ec.getVariables(), partitionedMatrices );
- M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate
+ rewriteSetDataPartitioner( pn, ec.getVariables(), partitionedMatrices, OptimizerUtils.getLocalMemBudget() );
+ double M0b = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate
// rewrite 2: remove unnecessary compare matrix
rewriteRemoveUnnecessaryCompareMatrix(pn, ec);
// rewrite 3: rewrite result partitioning (incl. log/phy recompile LIX)
- boolean flagLIX = super.rewriteSetResultPartitioning( pn, M1, ec.getVariables() );
- M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate
- M2 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, LopProperties.ExecType.CP);
+ boolean flagLIX = super.rewriteSetResultPartitioning( pn, M0b, ec.getVariables() );
+ double M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate
LOG.debug(getOptMode()+" OPT: estimated new mem (serial exec) M="+toMB(M1) );
+
+ //determine memory consumption for what-if: all-cp or partitioned
+ double M2 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, LopProperties.ExecType.CP);
LOG.debug(getOptMode()+" OPT: estimated new mem (serial exec, all CP) M="+toMB(M2) );
-
+ double M3 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, true);
+ LOG.debug(getOptMode()+" OPT: estimated new mem (cond partitioning) M="+toMB(M3) );
+
// rewrite 4: execution strategy
PExecMode tmpmode = getPExecMode(pn); //keep old
- boolean flagRecompMR = rewriteSetExecutionStategy( pn, M0, M1, M2, flagLIX );
+ boolean flagRecompMR = rewriteSetExecutionStategy( pn, M0a, M1, M2, M3, flagLIX );
//exec-type-specific rewrites
if( pn.getExecType() == ExecType.MR || pn.getExecType() == ExecType.SPARK )
{
+ if( M1 > _rm && M3 <= _rm ) {
+ // rewrite 1: data partitioning (apply conditional partitioning)
+ rewriteSetDataPartitioner( pn, ec.getVariables(), partitionedMatrices, M3 );
+ M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate
+ }
+
if( flagRecompMR ){
//rewrite 5: set operations exec type
rewriteSetOperationsExecType( pn, flagRecompMR );
@@ -221,7 +230,7 @@ public class OptimizerConstrained extends OptimizerRuleBased
///
@Override
- protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap vars, HashMap<String,PDataPartitionFormat> partitionedMatrices)
+ protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap vars, HashMap<String,PDataPartitionFormat> partitionedMatrices, double thetaM)
throws DMLRuntimeException
{
boolean blockwise = false;
@@ -235,7 +244,7 @@ public class OptimizerConstrained extends OptimizerRuleBased
LOG.debug(getOptMode()+" OPT: forced 'set data partitioner' - result="+n.getParam(ParamType.DATA_PARTITIONER) );
}
else
- super.rewriteSetDataPartitioner(n, vars, partitionedMatrices);
+ super.rewriteSetDataPartitioner(n, vars, partitionedMatrices, thetaM);
return blockwise;
}
@@ -246,7 +255,7 @@ public class OptimizerConstrained extends OptimizerRuleBased
///
@Override
- protected boolean rewriteSetExecutionStategy(OptNode n, double M0, double M, double M2, boolean flagLIX)
+ protected boolean rewriteSetExecutionStategy(OptNode n, double M0, double M, double M2, double M3, boolean flagLIX)
throws DMLRuntimeException
{
boolean ret = false;
@@ -270,7 +279,7 @@ public class OptimizerConstrained extends OptimizerRuleBased
LOG.debug(getOptMode()+" OPT: forced 'set execution strategy' - result="+mode );
}
else
- ret = super.rewriteSetExecutionStategy(n, M0, M, M2, flagLIX);
+ ret = super.rewriteSetExecutionStategy(n, M0, M, M2, M3, flagLIX);
return ret;
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2f7fa8d7/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index bbe5bf7..87cabaa 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -216,7 +216,6 @@ public class OptimizerRuleBased extends Optimizer
LOG.debug("--- "+getOptMode()+" OPTIMIZER -------");
OptNode pn = plan.getRoot();
- double M0 = -1, M1 = -1, M2 = -1; //memory consumption
//early abort for empty parfor body
if( pn.isLeaf() )
@@ -234,32 +233,42 @@ public class OptimizerRuleBased extends Optimizer
//ESTIMATE memory consumption
pn.setSerialParFor(); //for basic mem consumption
- M0 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn);
- LOG.debug(getOptMode()+" OPT: estimated mem (serial exec) M="+toMB(M0) );
+ double M0a = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn);
+ LOG.debug(getOptMode()+" OPT: estimated mem (serial exec) M="+toMB(M0a) );
//OPTIMIZE PARFOR PLAN
- // rewrite 1: data partitioning (incl. log. recompile RIX)
+ // rewrite 1: data partitioning (incl. log. recompile RIX and flag opt nodes)
HashMap<String, PDataPartitionFormat> partitionedMatrices = new HashMap<String,PDataPartitionFormat>();
- rewriteSetDataPartitioner( pn, ec.getVariables(), partitionedMatrices );
- M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate
+ rewriteSetDataPartitioner( pn, ec.getVariables(), partitionedMatrices, OptimizerUtils.getLocalMemBudget() );
+ double M0b = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate
// rewrite 2: remove unnecessary compare matrix (before result partitioning)
rewriteRemoveUnnecessaryCompareMatrix(pn, ec);
// rewrite 3: rewrite result partitioning (incl. log/phy recompile LIX)
- boolean flagLIX = rewriteSetResultPartitioning( pn, M1, ec.getVariables() );
- M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate
- M2 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, LopProperties.ExecType.CP);
+ boolean flagLIX = rewriteSetResultPartitioning( pn, M0b, ec.getVariables() );
+ double M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate
LOG.debug(getOptMode()+" OPT: estimated new mem (serial exec) M="+toMB(M1) );
+
+ //determine memory consumption for what-if: all-cp or partitioned
+ double M2 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, LopProperties.ExecType.CP);
LOG.debug(getOptMode()+" OPT: estimated new mem (serial exec, all CP) M="+toMB(M2) );
+ double M3 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, true);
+ LOG.debug(getOptMode()+" OPT: estimated new mem (cond partitioning) M="+toMB(M3) );
// rewrite 4: execution strategy
- boolean flagRecompMR = rewriteSetExecutionStategy( pn, M0, M1, M2, flagLIX );
+ boolean flagRecompMR = rewriteSetExecutionStategy( pn, M0a, M1, M2, M3, flagLIX );
//exec-type-specific rewrites
if( pn.getExecType() == ExecType.MR || pn.getExecType()==ExecType.SPARK )
{
+ if( M1 > _rm && M3 <= _rm ) {
+ // rewrite 1: data partitioning (apply conditional partitioning)
+ rewriteSetDataPartitioner( pn, ec.getVariables(), partitionedMatrices, M3 );
+ M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate
+ }
+
if( flagRecompMR ){
//rewrite 5: set operations exec type
rewriteSetOperationsExecType( pn, flagRecompMR );
@@ -390,7 +399,7 @@ public class OptimizerRuleBased extends Optimizer
//REWRITE set data partitioner
///
- protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap vars, HashMap<String, PDataPartitionFormat> partitionedMatrices )
+ protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap vars, HashMap<String, PDataPartitionFormat> partitionedMatrices, double thetaM )
throws DMLRuntimeException
{
if( n.getNodeType() != NodeType.PARFOR )
@@ -414,16 +423,15 @@ public class OptimizerRuleBased extends Optimizer
for( String c : cand )
{
PDataPartitionFormat dpf = pfsb.determineDataPartitionFormat( c );
- //System.out.println("Partitioning Format: "+dpf);
+
if( dpf != PDataPartitionFormat.NONE
- && dpf != PDataPartitionFormat.BLOCK_WISE_M_N ) //FIXME
+ && dpf != PDataPartitionFormat.BLOCK_WISE_M_N )
{
cand2.put( c, dpf );
- }
-
+ }
}
- apply = rFindDataPartitioningCandidates(n, cand2, vars);
+ apply = rFindDataPartitioningCandidates(n, cand2, vars, thetaM);
if( apply )
partitionedMatrices.putAll(cand2);
}
@@ -447,7 +455,7 @@ public class OptimizerRuleBased extends Optimizer
return blockwise;
}
- protected boolean rFindDataPartitioningCandidates( OptNode n, HashMap<String, PDataPartitionFormat> cand, LocalVariableMap vars )
+ protected boolean rFindDataPartitioningCandidates( OptNode n, HashMap<String, PDataPartitionFormat> cand, LocalVariableMap vars, double thetaM )
throws DMLRuntimeException
{
boolean ret = false;
@@ -456,7 +464,7 @@ public class OptimizerRuleBased extends Optimizer
{
for( OptNode cn : n.getChilds() )
if( cn.getNodeType() != NodeType.FUNCCALL ) //prevent conflicts with aliases
- ret |= rFindDataPartitioningCandidates( cn, cand, vars );
+ ret |= rFindDataPartitioningCandidates( cn, cand, vars, thetaM );
}
else if( n.getNodeType()== NodeType.HOP
&& n.getParam(ParamType.OPSTRING).equals(IndexingOp.OPSTRING) )
@@ -470,20 +478,22 @@ public class OptimizerRuleBased extends Optimizer
//NOTE: for the moment, we do not partition according to the remote mem, because we can execute
//it even without partitioning in CP. However, advanced optimizers should reason about this
//double mold = h.getMemEstimate();
- if( n.getExecType() == ExecType.MR || n.getExecType()==ExecType.SPARK ) //Opt Condition: MR/Spark
- // || (mold > _rm && mnew <= _rm) ) //Opt Condition: non-MR special cases (for remote exec)
+ if( n.getExecType() == ExecType.MR || n.getExecType()==ExecType.SPARK //Opt Condition: MR/Spark
+ || h.getMemEstimate() > thetaM ) //Opt Condition: mem estimate > constraint to force partitioning
{
//NOTE: subsequent rewrites will still use the MR mem estimate
//(guarded by subsequent operations that have at least the memory req of one partition)
- //if( mnew < _lm ) //apply rewrite if partitions fit into memory
- // n.setExecType(ExecType.CP);
- //else
- // n.setExecType(ExecType.CP); //CP_FILE, but hop still in MR
- n.setExecType(ExecType.CP);
+ n.setExecType(ExecType.CP); //partition ref only (see below)
n.addParam(ParamType.DATA_PARTITION_FORMAT, dpf.toString());
h.setMemEstimate( mnew ); //CP vs CP_FILE in ProgramRecompiler bases on mem_estimate
ret = true;
}
+ //keep track of nodes that allow conditional data partitioning and their mem
+ else
+ {
+ n.addParam(ParamType.DATA_PARTITION_COND, String.valueOf(true));
+ n.addParam(ParamType.DATA_PARTITION_COND_MEM, String.valueOf(mnew));
+ }
}
}
@@ -803,7 +813,7 @@ public class OptimizerRuleBased extends Optimizer
//REWRITE set execution strategy
///
- protected boolean rewriteSetExecutionStategy(OptNode n, double M0, double M, double M2, boolean flagLIX)
+ protected boolean rewriteSetExecutionStategy(OptNode n, double M0, double M, double M2, double M3, boolean flagLIX)
throws DMLRuntimeException
{
boolean isCPOnly = n.isCPOnly();
@@ -814,26 +824,27 @@ public class OptimizerRuleBased extends Optimizer
PDataPartitioner REMOTE_DP = OptimizerUtils.isSparkExecutionMode() ? PDataPartitioner.REMOTE_SPARK : PDataPartitioner.REMOTE_MR;
//deciding on the execution strategy
- if( ConfigurationManager.isParallelParFor() //allowed remote parfor execution
- && ( (isCPOnly && M <= _rm ) //Required: all instruction can be be executed in CP
- ||(isCPOnlyPossible && M2 <= _rm)) ) //Required: cp inst fit into remote JVM mem
+ if( ConfigurationManager.isParallelParFor() //allowed remote parfor execution
+ && ( (isCPOnly && M <= _rm ) //Required: all inst already in cp and fit in remote mem
+ ||(isCPOnly && M3 <= _rm ) //Required: all inst already in cp and fit partitioned in remote mem
+ ||(isCPOnlyPossible && M2 <= _rm)) ) //Required: all inst forced to cp fit in remote mem
{
//at this point all required conditions for REMOTE_MR given, now its an opt decision
int cpk = (int) Math.min( _lk, Math.floor( _lm / M ) ); //estimated local exploited par
//MR if local par cannot be exploited due to mem constraints (this implies that we work on large data)
//(the factor of 2 is to account for hyper-threading and in order prevent too eager remote parfor)
- if( 2*cpk < _lk && 2*cpk < _N && 2*cpk < _rk )
+ if( 2*cpk < _lk && 2*cpk < _N && 2*cpk < _rk ) //incl conditional partitioning
{
n.setExecType( REMOTE ); //remote parfor
}
//MR if problem is large enough and remote parallelism is larger than local
- else if( _lk < _N && _lk < _rk && isLargeProblem(n, M0) )
+ else if( _lk < _N && _lk < _rk && M <= _rm && isLargeProblem(n, M0) )
{
n.setExecType( REMOTE ); //remote parfor
}
//MR if MR operations in local, but CP only in remote (less overall MR jobs)
- else if( (!isCPOnly) && isCPOnlyPossible )
+ else if( !isCPOnly && isCPOnlyPossible )
{
n.setExecType( REMOTE ); //remote parfor
}