You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/05/29 22:17:20 UTC
[systemds] branch master updated: [SYSTEMDS-2989] Fix thread
contention in parfor eval function calls
This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new c4115fc [SYSTEMDS-2989] Fix thread contention in parfor eval function calls
c4115fc is described below
commit c4115fcc172bb4994fd70e8e746eb46c35007f59
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Sun May 30 00:16:55 2021 +0200
[SYSTEMDS-2989] Fix thread contention in parfor eval function calls
In parfor, individual functions are deep copied per worker in order to
prevent thread contention on shared HOP DAGs during recompilation.
However, if functions are called through eval (second-order functions),
this setup cannot be done. For that reason, gridSearch with functions
that do not benefit from recompile-once function compilation (e.g.,
MLogreg) showed large overhead.
This patch fixes this thread contention issue by also deep-copying the
functions called through eval, but on-demand during eval function calls.
On a scenario of gridSearch with a single node (112 vcores), the Adult
dataset, 3003 hyper-parameter configurations and MLogreg, this patch
improved end-to-end performance by 3x (from 1,911s to 654s) and
recompilation times by >15x (from 147,624s to 9,318s).
---
.../runtime/controlprogram/ParForProgramBlock.java | 1 +
.../instructions/cp/EvalNaryCPInstruction.java | 25 +++++++++++++++++++++-
.../sysds/runtime/util/ProgramConverter.java | 4 +++-
3 files changed, 28 insertions(+), 2 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index 16e57da..2daa5ee 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -299,6 +299,7 @@ public class ParForProgramBlock extends ForProgramBlock
public static final boolean LIVEVAR_AWARE_EXPORT = true; // export only read variables according to live variable analysis
public static final boolean RESET_RECOMPILATION_FLAGs = true;
public static boolean ALLOW_BROADCAST_INPUTS = true; // enables to broadcast inputs for remote_spark
+ public static final boolean COPY_EVAL_FUNCTIONS = true; // copy eval functions similar to normal parfor functions
public static final String PARFOR_FNAME_PREFIX = "/parfor/";
public static final String PARFOR_MR_TASKS_TMP_FNAME = PARFOR_FNAME_PREFIX + "%ID%_MR_taskfile";
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
index d200bea..7a0e2ac 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
@@ -32,6 +32,7 @@ import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.hops.rewrite.HopRewriteUtils;
import org.apache.sysds.hops.rewrite.ProgramRewriter;
+import org.apache.sysds.lops.Lop;
import org.apache.sysds.lops.compile.Dag;
import org.apache.sysds.parser.DMLProgram;
import org.apache.sysds.parser.DMLTranslator;
@@ -39,13 +40,16 @@ import org.apache.sysds.parser.FunctionStatementBlock;
import org.apache.sysds.parser.dml.DmlSyntacticValidator;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.FunctionProgramBlock;
+import org.apache.sysds.runtime.controlprogram.ParForProgramBlock;
import org.apache.sysds.runtime.controlprogram.Program;
+import org.apache.sysds.runtime.controlprogram.ProgramBlock;
import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.operators.Operator;
import org.apache.sysds.runtime.util.DataConverter;
+import org.apache.sysds.runtime.util.ProgramConverter;
/**
* Eval built-in function instruction
@@ -53,6 +57,8 @@ import org.apache.sysds.runtime.util.DataConverter;
*/
public class EvalNaryCPInstruction extends BuiltinNaryCPInstruction {
+ private int _threadID = -1;
+
public EvalNaryCPInstruction(Operator op, String opcode, String istr, CPOperand output, CPOperand... inputs) {
super(op, opcode, istr, output, inputs);
}
@@ -93,6 +99,17 @@ public class EvalNaryCPInstruction extends BuiltinNaryCPInstruction {
//obtain function block (but unoptimized version of existing functions for correctness)
FunctionProgramBlock fpb = ec.getProgram().getFunctionProgramBlock(nsName, funcName, false);
+ //copy function block in parfor context (avoid excessive thread contention on recompilation)
+ if( ProgramBlock.isThreadID(_threadID) && ParForProgramBlock.COPY_EVAL_FUNCTIONS ) {
+ String funcNameParfor = funcName + Lop.CP_CHILD_THREAD + _threadID;
+ if( !ec.getProgram().containsFunctionProgramBlock(nsName, funcNameParfor, false) ) { //copy on demand
+ fpb = ProgramConverter.createDeepCopyFunctionProgramBlock(fpb, new HashSet<>(), new HashSet<>());
+ ec.getProgram().addFunctionProgramBlock(nsName, funcNameParfor, fpb, false);
+ }
+ fpb = ec.getProgram().getFunctionProgramBlock(nsName, funcNameParfor, false);
+ funcName = funcNameParfor;
+ }
+
//4. expand list arguments if needed
CPOperand[] boundInputs2 = null;
if( boundInputs.length == 1 && boundInputs[0].getDataType().isList()
@@ -116,7 +133,7 @@ public class EvalNaryCPInstruction extends BuiltinNaryCPInstruction {
FunctionCallCPInstruction fcpi = new FunctionCallCPInstruction(nsName, funcName,
false, boundInputs, fpb.getInputParamNames(), boundOutputNames, "eval func");
fcpi.processInstruction(ec);
-
+
//6. convert the result to matrix
Data newOutput = ec.getVariable(output);
if (!(newOutput instanceof MatrixObject)) {
@@ -141,6 +158,12 @@ public class EvalNaryCPInstruction extends BuiltinNaryCPInstruction {
}
}
+ @Override
+ public void updateInstructionThreadID(String pattern, String replace) {
+ //obtain thread (parfor worker) ID from replacement string
+ _threadID = Integer.parseInt(replace.substring(Lop.CP_CHILD_THREAD.length()));
+ }
+
private static void compileFunctionProgramBlock(String name, DataType dt, Program prog) {
//load builtin file and parse function statement block
String nsName = DMLProgram.BUILTIN_NAMESPACE;
diff --git a/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java b/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
index f595137..fcf93d4 100644
--- a/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
+++ b/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
@@ -79,6 +79,7 @@ import org.apache.sysds.runtime.instructions.cp.BooleanObject;
import org.apache.sysds.runtime.instructions.cp.CPInstruction;
import org.apache.sysds.runtime.instructions.cp.Data;
import org.apache.sysds.runtime.instructions.cp.DoubleObject;
+import org.apache.sysds.runtime.instructions.cp.EvalNaryCPInstruction;
import org.apache.sysds.runtime.instructions.cp.FunctionCallCPInstruction;
import org.apache.sysds.runtime.instructions.cp.IntObject;
import org.apache.sysds.runtime.instructions.cp.ListObject;
@@ -1718,7 +1719,8 @@ public class ProgramConverter
* @return instruction
*/
private static Instruction saveReplaceThreadID( Instruction inst, String pattern, String replacement ) {
- if ( inst instanceof VariableCPInstruction ) { //createvar, setfilename
+ if ( inst instanceof VariableCPInstruction //createvar, setfilename
+ || inst instanceof EvalNaryCPInstruction ) {
//update in-memory representation
inst.updateInstructionThreadID(pattern, replacement);
}