You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/09/04 21:03:10 UTC
[systemds] 01/02: [SYSTEMDS-3122] Fix parfor degree of parallelism
w/ eval functions
This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
commit e03ef9691d111eca82e8c8bdb0373cd40bdc361e
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Sat Sep 4 22:19:24 2021 +0200
[SYSTEMDS-3122] Fix parfor degree of parallelism w/ eval functions
Assume the following special-case (but increasingly common) scenario of
three functions fun1, fun2, fun3, where fun1 might be, for example,
hyper-parameter tuning with unknown models/functions. There was an issue
where the parfor optimizer set the degree of parallelism to 112, and
then tried to set all reachable program blocks and functions to a DOP 1.
However, because it encounters an eval with unknown function call, it
recompiled all existing functions (including fun1) to DOP 1 and thus,
destroyed its own optimization decisions. This patch now properly fixes
these decisions (for a tree of nested parfor) when recompiling eval
functions.
function fun1()
parfor(i in 1:n)
eval("fun2", X, y)
function fun2()
fun3()
function fun3()
X = X + 1
On the topk-cleaning pipeline enumeration until the hyper-parameter
tuning for dirty baseline accuracy, this patch improved the end-to-end
runtime from 51s to 11s.
---
.../sysds/runtime/controlprogram/ParForProgramBlock.java | 11 ++++++++++-
.../runtime/controlprogram/paramserv/ParamservUtils.java | 6 ++++--
.../runtime/controlprogram/parfor/opt/OptimizerRuleBased.java | 11 +++++++++--
3 files changed, 23 insertions(+), 5 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index 42ab8bc..a289218 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -320,6 +320,7 @@ public class ParForProgramBlock extends ForProgramBlock
protected final boolean _monitor;
protected final Level _optLogLevel;
protected int _numThreads = -1;
+ protected boolean _fixedDOP = false; //guard for numThreads
protected long _taskSize = -1;
protected PTaskPartitioner _taskPartitioner = null;
protected PDataPartitioner _dataPartitioner = null;
@@ -471,6 +472,14 @@ public class ParForProgramBlock extends ForProgramBlock
_params.put(ParForStatementBlock.PAR, String.valueOf(_numThreads)); //kept up-to-date for copies
setLocalParWorkerIDs();
}
+
+ public boolean isDegreeOfParallelismFixed() {
+ return _fixedDOP;
+ }
+
+ public void setDegreeOfParallelismFixed(boolean flag) {
+ _fixedDOP = flag;
+ }
public void setCPCaching(boolean flag) {
_enableCPCaching = flag;
@@ -1187,7 +1196,7 @@ public class ParForProgramBlock extends ForProgramBlock
try
{
//create deep copies of required elements child blocks
- ArrayList<ProgramBlock> cpChildBlocks = null;
+ ArrayList<ProgramBlock> cpChildBlocks = null;
HashSet<String> fnNames = new HashSet<>();
if( USE_PB_CACHE )
{
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
index da1e9f7..b25c7df 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
@@ -309,8 +309,10 @@ public class ParamservUtils {
for (ProgramBlock pb : pbs) {
if (pb instanceof ParForProgramBlock) {
ParForProgramBlock pfpb = (ParForProgramBlock) pb;
- pfpb.setDegreeOfParallelism(k);
- recompiled |= rAssignParallelismAndRecompile(pfpb.getChildBlocks(), 1, recompiled, forceExecTypeCP);
+ if( !pfpb.isDegreeOfParallelismFixed() ) {
+ pfpb.setDegreeOfParallelism(k);
+ recompiled |= rAssignParallelismAndRecompile(pfpb.getChildBlocks(), 1, recompiled, forceExecTypeCP);
+ }
} else if (pb instanceof ForProgramBlock) {
recompiled |= rAssignParallelismAndRecompile(((ForProgramBlock) pb).getChildBlocks(), k, recompiled, forceExecTypeCP);
} else if (pb instanceof WhileProgramBlock) {
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index cf2c091..b2c82c1 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -1178,12 +1178,14 @@ public class OptimizerRuleBased extends Optimizer {
//set parfor degree of parallelism
pfpb.setDegreeOfParallelism(parforK);
+ pfpb.setDegreeOfParallelismFixed(true);
n.setK(parforK);
//distribute remaining parallelism
int remainParforK = getRemainingParallelismParFor(kMax, parforK);
int remainOpsK = getRemainingParallelismOps(_lkmaxCP, parforK);
rAssignRemainingParallelism( n, remainParforK, remainOpsK );
+ pfpb.setDegreeOfParallelismFixed(false);
}
else // ExecType.MR/ExecType.SPARK
{
@@ -1212,7 +1214,9 @@ public class OptimizerRuleBased extends Optimizer {
kMax = 1;
//distribute remaining parallelism and recompile parallel instructions
+ pfpb.setDegreeOfParallelismFixed(true);
rAssignRemainingParallelism( n, kMax, 1 );
+ pfpb.setDegreeOfParallelismFixed(false);
}
_numEvaluatedPlans++;
@@ -1247,14 +1251,15 @@ public class OptimizerRuleBased extends Optimizer {
//set parfor degree of parallelism
long id = c.getID();
c.setK(tmpK);
- ParForProgramBlock pfpb = (ParForProgramBlock)
- _plan.getMappedProgramBlock(id);
+ ParForProgramBlock pfpb = (ParForProgramBlock) _plan.getMappedProgramBlock(id);
pfpb.setDegreeOfParallelism(tmpK);
//distribute remaining parallelism
int remainParforK = getRemainingParallelismParFor(parforK, tmpK);
int remainOpsK = getRemainingParallelismOps(opsK, tmpK);
+ pfpb.setDegreeOfParallelismFixed(true);
rAssignRemainingParallelism(c, remainParforK, remainOpsK);
+ pfpb.setDegreeOfParallelismFixed(false);
}
else if( c.getNodeType() == NodeType.HOP )
{
@@ -1278,6 +1283,8 @@ public class OptimizerRuleBased extends Optimizer {
}
//if parfor contains eval call, make unoptimized functions single-threaded
+ //(parent parfor program blocks have been frozen such that the following
+ //recompilation of all possible functions does not reset the DOP to 1)
if( HopRewriteUtils.isNary(h, OpOpN.EVAL) ) {
ProgramBlock pb = _plan.getMappedProgramBlock(n.getID());
pb.getProgram().getFunctionProgramBlocks(false)