You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/05/29 22:17:20 UTC

[systemds] branch master updated: [SYSTEMDS-2989] Fix thread contention in parfor eval function calls

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new c4115fc  [SYSTEMDS-2989] Fix thread contention in parfor eval function calls
c4115fc is described below

commit c4115fcc172bb4994fd70e8e746eb46c35007f59
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Sun May 30 00:16:55 2021 +0200

    [SYSTEMDS-2989] Fix thread contention in parfor eval function calls
    
    In parfor, individual functions are deep copied per worker in order to
    prevent thread contention on shared HOP DAGs during recompilation.
    However, if functions are called through eval (second-order functions),
    this setup cannot be done. For that reason, gridSearch with functions
    that do not benefit from recompile-once function compilation (e.g.,
    MLogreg) showed large overhead.
    
    This patch fixes this thread contention issue by also deep-copying the
    functions called through eval, but on-demand during eval function calls.
    On a scenario of gridSearch with a single node (112 vcores), the Adult
    dataset, 3003 hyper-parameter configurations and MLogreg, this patch
    improved end-to-end performance by 3x (from 1,911s to 654s) and
    recompilation times by >15x (from 147,624s to 9,318s).
---
 .../runtime/controlprogram/ParForProgramBlock.java |  1 +
 .../instructions/cp/EvalNaryCPInstruction.java     | 25 +++++++++++++++++++++-
 .../sysds/runtime/util/ProgramConverter.java       |  4 +++-
 3 files changed, 28 insertions(+), 2 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index 16e57da..2daa5ee 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -299,6 +299,7 @@ public class ParForProgramBlock extends ForProgramBlock
 	public static final boolean LIVEVAR_AWARE_EXPORT        = true; // export only read variables according to live variable analysis
 	public static final boolean RESET_RECOMPILATION_FLAGs   = true;
 	public static       boolean ALLOW_BROADCAST_INPUTS      = true; // enables to broadcast inputs for remote_spark
+	public static final boolean COPY_EVAL_FUNCTIONS         = true; // copy eval functions similar to normal parfor functions
 	
 	public static final String PARFOR_FNAME_PREFIX          = "/parfor/"; 
 	public static final String PARFOR_MR_TASKS_TMP_FNAME    = PARFOR_FNAME_PREFIX + "%ID%_MR_taskfile"; 
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
index d200bea..7a0e2ac 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
@@ -32,6 +32,7 @@ import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.hops.rewrite.HopRewriteUtils;
 import org.apache.sysds.hops.rewrite.ProgramRewriter;
+import org.apache.sysds.lops.Lop;
 import org.apache.sysds.lops.compile.Dag;
 import org.apache.sysds.parser.DMLProgram;
 import org.apache.sysds.parser.DMLTranslator;
@@ -39,13 +40,16 @@ import org.apache.sysds.parser.FunctionStatementBlock;
 import org.apache.sysds.parser.dml.DmlSyntacticValidator;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.FunctionProgramBlock;
+import org.apache.sysds.runtime.controlprogram.ParForProgramBlock;
 import org.apache.sysds.runtime.controlprogram.Program;
+import org.apache.sysds.runtime.controlprogram.ProgramBlock;
 import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.operators.Operator;
 import org.apache.sysds.runtime.util.DataConverter;
+import org.apache.sysds.runtime.util.ProgramConverter;
 
 /**
  * Eval built-in function instruction
@@ -53,6 +57,8 @@ import org.apache.sysds.runtime.util.DataConverter;
  */
 public class EvalNaryCPInstruction extends BuiltinNaryCPInstruction {
 
+	private int _threadID = -1;
+	
 	public EvalNaryCPInstruction(Operator op, String opcode, String istr, CPOperand output, CPOperand... inputs) {
 		super(op, opcode, istr, output, inputs);
 	}
@@ -93,6 +99,17 @@ public class EvalNaryCPInstruction extends BuiltinNaryCPInstruction {
 		//obtain function block (but unoptimized version of existing functions for correctness)
 		FunctionProgramBlock fpb = ec.getProgram().getFunctionProgramBlock(nsName, funcName, false);
 		
+		//copy function block in parfor context (avoid excessive thread contention on recompilation)
+		if( ProgramBlock.isThreadID(_threadID) && ParForProgramBlock.COPY_EVAL_FUNCTIONS ) {
+			String funcNameParfor = funcName + Lop.CP_CHILD_THREAD + _threadID;
+			if( !ec.getProgram().containsFunctionProgramBlock(nsName, funcNameParfor, false) ) { //copy on demand
+				fpb = ProgramConverter.createDeepCopyFunctionProgramBlock(fpb, new HashSet<>(), new HashSet<>());
+				ec.getProgram().addFunctionProgramBlock(nsName, funcNameParfor, fpb, false);
+			}
+			fpb = ec.getProgram().getFunctionProgramBlock(nsName, funcNameParfor, false);
+			funcName = funcNameParfor;
+		}
+		
 		//4. expand list arguments if needed
 		CPOperand[] boundInputs2 = null;
 		if( boundInputs.length == 1 && boundInputs[0].getDataType().isList()
@@ -116,7 +133,7 @@ public class EvalNaryCPInstruction extends BuiltinNaryCPInstruction {
 		FunctionCallCPInstruction fcpi = new FunctionCallCPInstruction(nsName, funcName,
 			false, boundInputs, fpb.getInputParamNames(), boundOutputNames, "eval func");
 		fcpi.processInstruction(ec);
-
+		
 		//6. convert the result to matrix
 		Data newOutput = ec.getVariable(output);
 		if (!(newOutput instanceof MatrixObject)) {
@@ -141,6 +158,12 @@ public class EvalNaryCPInstruction extends BuiltinNaryCPInstruction {
 		}
 	}
 	
+	@Override
+	public void updateInstructionThreadID(String pattern, String replace) {
+		//obtain thread (parfor worker) ID from replacement string
+		_threadID = Integer.parseInt(replace.substring(Lop.CP_CHILD_THREAD.length()));
+	}
+	
 	private static void compileFunctionProgramBlock(String name, DataType dt, Program prog) {
 		//load builtin file and parse function statement block
 		String nsName = DMLProgram.BUILTIN_NAMESPACE;
diff --git a/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java b/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
index f595137..fcf93d4 100644
--- a/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
+++ b/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
@@ -79,6 +79,7 @@ import org.apache.sysds.runtime.instructions.cp.BooleanObject;
 import org.apache.sysds.runtime.instructions.cp.CPInstruction;
 import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.instructions.cp.DoubleObject;
+import org.apache.sysds.runtime.instructions.cp.EvalNaryCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.FunctionCallCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.IntObject;
 import org.apache.sysds.runtime.instructions.cp.ListObject;
@@ -1718,7 +1719,8 @@ public class ProgramConverter
 	 * @return instruction
 	 */
 	private static Instruction saveReplaceThreadID( Instruction inst, String pattern, String replacement ) {
-		if ( inst instanceof VariableCPInstruction ) { //createvar, setfilename
+		if ( inst instanceof VariableCPInstruction //createvar, setfilename
+			|| inst instanceof EvalNaryCPInstruction ) {
 			//update in-memory representation
 			inst.updateInstructionThreadID(pattern, replacement);
 		}