You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/04/19 07:18:35 UTC
[2/2] incubator-systemml git commit: [SYSTEMML-1546] Fix parfor
optimizer (result/task partitioning on spark)
[SYSTEMML-1546] Fix parfor optimizer (result/task partitioning on spark)
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/4c74a343
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/4c74a343
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/4c74a343
Branch: refs/heads/master
Commit: 4c74a34349bd4eeb0f4e102db7bca1f09b2ced97
Parents: 30d4b1e
Author: Matthias Boehm <mb...@gmail.com>
Authored: Wed Apr 19 00:12:03 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Wed Apr 19 00:12:03 2017 -0700
----------------------------------------------------------------------
.../parfor/opt/OptimizerConstrained.java | 2 +-
.../parfor/opt/OptimizerRuleBased.java | 44 +++++++++++---------
2 files changed, 25 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4c74a343/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
index 235b927..fb83fd6 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
@@ -131,7 +131,7 @@ public class OptimizerConstrained extends OptimizerRuleBased
boolean flagRecompMR = rewriteSetExecutionStategy( pn, M0a, M1, M2, M3, flagLIX );
//exec-type-specific rewrites
- if( pn.getExecType() == ExecType.MR || pn.getExecType() == ExecType.SPARK )
+ if( pn.getExecType() == getRemoteExecType() )
{
if( M1 > _rm && M3 <= _rm ) {
// rewrite 1: data partitioning (apply conditional partitioning)
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4c74a343/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index f0eb0e7..0ff7a31 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -262,7 +262,7 @@ public class OptimizerRuleBased extends Optimizer
boolean flagRecompMR = rewriteSetExecutionStategy( pn, M0a, M1, M2, M3, flagLIX );
//exec-type-specific rewrites
- if( pn.getExecType() == ExecType.MR || pn.getExecType()==ExecType.SPARK )
+ if( pn.getExecType() == getRemoteExecType() )
{
if( M1 > _rm && M3 <= _rm ) {
// rewrite 1: data partitioning (apply conditional partitioning)
@@ -400,6 +400,10 @@ public class OptimizerRuleBased extends Optimizer
_rkmax2 = (int) Math.ceil( PAR_K_FACTOR * _rk2 );
}
+ protected ExecType getRemoteExecType() {
+ return OptimizerUtils.isSparkExecutionMode() ? ExecType.SPARK : ExecType.MR;
+ }
+
///////
//REWRITE set data partitioner
///
@@ -483,7 +487,7 @@ public class OptimizerRuleBased extends Optimizer
//NOTE: for the moment, we do not partition according to the remote mem, because we can execute
//it even without partitioning in CP. However, advanced optimizers should reason about this
//double mold = h.getMemEstimate();
- if( n.getExecType() == ExecType.MR || n.getExecType()==ExecType.SPARK //Opt Condition: MR/Spark
+ if( n.getExecType() == getRemoteExecType() //Opt Condition: MR/Spark
|| h.getMemEstimate() > thetaM ) //Opt Condition: mem estimate > constraint to force partitioning
{
//NOTE: subsequent rewrites will still use the MR mem estimate
@@ -608,23 +612,22 @@ public class OptimizerRuleBased extends Optimizer
ParForProgramBlock pfpb = (ParForProgramBlock) o[1];
//search for candidates
- Collection<OptNode> cand = n.getNodeList(ExecType.MR);
+ Collection<OptNode> cand = n.getNodeList(getRemoteExecType());
//determine if applicable
- boolean apply = M < _rm //ops fit in remote memory budget
- && !cand.isEmpty() //at least one MR
- && isResultPartitionableAll(cand,pfpb.getResultVariables(),vars, pfpb.getIterablePredicateVars()[0]); // check candidates
+ boolean apply = M < _rm //ops fit in remote memory budget
+ && !cand.isEmpty() //at least one MR
+ && isResultPartitionableAll(cand,pfpb.getResultVariables(),
+ vars, pfpb.getIterablePredicateVars()[0]); // check candidates
//recompile LIX
if( apply )
{
- try
- {
+ try {
for(OptNode lix : cand)
recompileLIX( lix, vars );
}
- catch(Exception ex)
- {
+ catch(Exception ex) {
throw new DMLRuntimeException("Unable to recompile LIX.", ex);
}
}
@@ -827,8 +830,9 @@ public class OptimizerRuleBased extends Optimizer
boolean isCPOnlyPossible = isCPOnly || isCPOnlyPossible(n, _rm);
String datapartitioner = n.getParam(ParamType.DATA_PARTITIONER);
- ExecType REMOTE = OptimizerUtils.isSparkExecutionMode() ? ExecType.SPARK : ExecType.MR;
- PDataPartitioner REMOTE_DP = OptimizerUtils.isSparkExecutionMode() ? PDataPartitioner.REMOTE_SPARK : PDataPartitioner.REMOTE_MR;
+ ExecType REMOTE = getRemoteExecType();
+ PDataPartitioner REMOTE_DP = OptimizerUtils.isSparkExecutionMode() ?
+ PDataPartitioner.REMOTE_SPARK : PDataPartitioner.REMOTE_MR;
//deciding on the execution strategy
if( ConfigurationManager.isParallelParFor() //allowed remote parfor execution
@@ -906,7 +910,7 @@ public class OptimizerRuleBased extends Optimizer
ExecType et = n.getExecType();
boolean ret = ( et == ExecType.CP);
- if( n.isLeaf() && (et == ExecType.MR || et == ExecType.SPARK) )
+ if( n.isLeaf() && et == getRemoteExecType() )
{
Hop h = OptTreeConverter.getAbstractPlanMapping().getMappedHop( n.getID() );
if( h.getForcedExecType()!=LopProperties.ExecType.MR //e.g., -exec=hadoop
@@ -1156,7 +1160,7 @@ public class OptimizerRuleBased extends Optimizer
.getAbstractPlanMapping().getMappedProg(n.getID())[1];
//decide on the replication factor
- if( n.getExecType()==ExecType.MR || n.getExecType()==ExecType.SPARK )
+ if( n.getExecType()==getRemoteExecType() )
{
apply = true;
@@ -1417,7 +1421,8 @@ public class OptimizerRuleBased extends Optimizer
{
setTaskPartitioner( pn, PTaskPartitioner.FACTORING_CMAX );
}
- else if( pn.getExecType()==ExecType.MR && !jvmreuse && pn.hasOnlySimpleChilds() )
+ else if( ((pn.getExecType()==ExecType.MR && !jvmreuse)
+ || pn.getExecType()==ExecType.SPARK) && pn.hasOnlySimpleChilds() )
{
//for simple body programs without loops, branches, or function calls, we don't
//expect much load imbalance and hence use static partitioning in order to
@@ -2931,7 +2936,7 @@ public class OptimizerRuleBased extends Optimizer
PResultMerge ret = null;
//investigate details of current parfor node
- boolean flagRemoteParFOR = (n.getExecType() == ExecType.MR || n.getExecType() == ExecType.SPARK);
+ boolean flagRemoteParFOR = (n.getExecType() == getRemoteExecType());
boolean flagLargeResult = hasLargeTotalResults( n, pfpb.getResultVariables(), vars, true );
boolean flagRemoteLeftIndexing = hasResultMRLeftIndexing( n, pfpb.getResultVariables(), vars, true );
boolean flagCellFormatWoCompare = determineFlagCellFormatWoCompare(pfpb.getResultVariables(), vars);
@@ -3015,8 +3020,8 @@ public class OptimizerRuleBased extends Optimizer
{
String opName = n.getParam(ParamType.OPSTRING);
//check opstring and exec type
- if( opName !=null && opName.equals(LeftIndexingOp.OPSTRING) &&
- (n.getExecType() == ExecType.MR || n.getExecType() == ExecType.SPARK) )
+ if( opName != null && opName.equals(LeftIndexingOp.OPSTRING)
+ && n.getExecType() == getRemoteExecType() )
{
LeftIndexingOp hop = (LeftIndexingOp) OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
//check agains set of varname
@@ -3151,7 +3156,7 @@ public class OptimizerRuleBased extends Optimizer
if( n.getNodeType() == NodeType.PARFOR )
{
rewriteSetResultMerge(n, vars, inLocal);
- if( n.getExecType()==ExecType.MR || n.getExecType()==ExecType.SPARK )
+ if( n.getExecType()==getRemoteExecType() )
inLocal = false;
}
else if( n.getChilds()!=null )
@@ -3493,7 +3498,6 @@ public class OptimizerRuleBased extends Optimizer
return count;
}
-
////////////////////////
// Helper methods //
////////////////////////