You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/09/04 21:03:10 UTC

[systemds] 01/02: [SYSTEMDS-3122] Fix parfor degree of parallelism w/ eval functions

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit e03ef9691d111eca82e8c8bdb0373cd40bdc361e
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Sat Sep 4 22:19:24 2021 +0200

    [SYSTEMDS-3122] Fix parfor degree of parallelism w/ eval functions
    
    Assume the following special-case (but increasingly common) scenario of
    three functions fun1, fun2, fun3, where fun1 might be, for example,
    hyper-parameter tuning with unknown models/functions. There was an issue
    where the parfor optimizer set the degree of parallelism to 112, and
    then tried to set all reachable program blocks and functions to a DOP 1.
    However, because it encounters an eval with unknown function call, it
    recompiled all existing functions (including fun1) to DOP 1 and thus,
    destroyed its own optimization decisions. This patch now properly fixes
    these decisions (for a tree of nested parfor) when recompiling eval
    functions.
    
    function fun1()
      parfor(i in 1:n)
        eval("fun2", X, y)
    
    function fun2()
      fun3()
    
    function fun3()
      X = X + 1
    
    On the topk-cleaning pipeline enumeration until the hyper-parameter
    tuning for dirty baseline accuracy, this patch improved the end-to-end
    runtime from 51s to 11s.
---
 .../sysds/runtime/controlprogram/ParForProgramBlock.java      | 11 ++++++++++-
 .../runtime/controlprogram/paramserv/ParamservUtils.java      |  6 ++++--
 .../runtime/controlprogram/parfor/opt/OptimizerRuleBased.java | 11 +++++++++--
 3 files changed, 23 insertions(+), 5 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index 42ab8bc..a289218 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -320,6 +320,7 @@ public class ParForProgramBlock extends ForProgramBlock
 	protected final boolean _monitor;
 	protected final Level _optLogLevel;
 	protected int _numThreads = -1;
+	protected boolean _fixedDOP = false; //guard for numThreads
 	protected long _taskSize = -1;
 	protected PTaskPartitioner _taskPartitioner = null;
 	protected PDataPartitioner _dataPartitioner = null;
@@ -471,6 +472,14 @@ public class ParForProgramBlock extends ForProgramBlock
 		_params.put(ParForStatementBlock.PAR, String.valueOf(_numThreads)); //kept up-to-date for copies
 		setLocalParWorkerIDs();
 	}
+	
+	public boolean isDegreeOfParallelismFixed() {
+		return _fixedDOP;
+	}
+	
+	public void setDegreeOfParallelismFixed(boolean flag) {
+		_fixedDOP = flag;
+	}
 
 	public void setCPCaching(boolean flag) {
 		_enableCPCaching = flag;
@@ -1187,7 +1196,7 @@ public class ParForProgramBlock extends ForProgramBlock
 		try
 		{
 			//create deep copies of required elements child blocks
-			ArrayList<ProgramBlock> cpChildBlocks = null;	
+			ArrayList<ProgramBlock> cpChildBlocks = null;
 			HashSet<String> fnNames = new HashSet<>();
 			if( USE_PB_CACHE )
 			{
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
index da1e9f7..b25c7df 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
@@ -309,8 +309,10 @@ public class ParamservUtils {
 		for (ProgramBlock pb : pbs) {
 			if (pb instanceof ParForProgramBlock) {
 				ParForProgramBlock pfpb = (ParForProgramBlock) pb;
-				pfpb.setDegreeOfParallelism(k);
-				recompiled |= rAssignParallelismAndRecompile(pfpb.getChildBlocks(), 1, recompiled, forceExecTypeCP);
+				if( !pfpb.isDegreeOfParallelismFixed() ) {
+					pfpb.setDegreeOfParallelism(k);
+					recompiled |= rAssignParallelismAndRecompile(pfpb.getChildBlocks(), 1, recompiled, forceExecTypeCP);
+				}
 			} else if (pb instanceof ForProgramBlock) {
 				recompiled |= rAssignParallelismAndRecompile(((ForProgramBlock) pb).getChildBlocks(), k, recompiled, forceExecTypeCP);
 			} else if (pb instanceof WhileProgramBlock) {
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index cf2c091..b2c82c1 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -1178,12 +1178,14 @@ public class OptimizerRuleBased extends Optimizer {
 			
 			//set parfor degree of parallelism
 			pfpb.setDegreeOfParallelism(parforK);
+			pfpb.setDegreeOfParallelismFixed(true);
 			n.setK(parforK);
 			
 			//distribute remaining parallelism 
 			int remainParforK = getRemainingParallelismParFor(kMax, parforK);
 			int remainOpsK = getRemainingParallelismOps(_lkmaxCP, parforK);
 			rAssignRemainingParallelism( n, remainParforK, remainOpsK );
+			pfpb.setDegreeOfParallelismFixed(false);
 		}
 		else // ExecType.MR/ExecType.SPARK
 		{
@@ -1212,7 +1214,9 @@ public class OptimizerRuleBased extends Optimizer {
 				kMax = 1;
 			
 			//distribute remaining parallelism and recompile parallel instructions
+			pfpb.setDegreeOfParallelismFixed(true);
 			rAssignRemainingParallelism( n, kMax, 1 );
+			pfpb.setDegreeOfParallelismFixed(false);
 		}
 		
 		_numEvaluatedPlans++;
@@ -1247,14 +1251,15 @@ public class OptimizerRuleBased extends Optimizer {
 					//set parfor degree of parallelism
 					long id = c.getID();
 					c.setK(tmpK);
-					ParForProgramBlock pfpb = (ParForProgramBlock) 
-						_plan.getMappedProgramBlock(id);
+					ParForProgramBlock pfpb = (ParForProgramBlock) _plan.getMappedProgramBlock(id);
 					pfpb.setDegreeOfParallelism(tmpK);
 					
 					//distribute remaining parallelism
 					int remainParforK = getRemainingParallelismParFor(parforK, tmpK);
 					int remainOpsK = getRemainingParallelismOps(opsK, tmpK);
+					pfpb.setDegreeOfParallelismFixed(true);
 					rAssignRemainingParallelism(c, remainParforK, remainOpsK);
+					pfpb.setDegreeOfParallelismFixed(false);
 				}
 				else if( c.getNodeType() == NodeType.HOP )
 				{
@@ -1278,6 +1283,8 @@ public class OptimizerRuleBased extends Optimizer {
 					}
 					
 					//if parfor contains eval call, make unoptimized functions single-threaded
+					//(parent parfor program blocks have been frozen such that the following
+					//recompilation of all possible functions does not reset the DOP to 1)
 					if( HopRewriteUtils.isNary(h, OpOpN.EVAL) ) {
 						ProgramBlock pb = _plan.getMappedProgramBlock(n.getID());
 						pb.getProgram().getFunctionProgramBlocks(false)