You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2022/04/05 21:51:16 UTC

[systemds] branch main updated: [SYSTEMDS-3342] Fix remote parfor for cleaning pipeline enumeration, III

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new e30f0b70df [SYSTEMDS-3342] Fix remote parfor for cleaning pipeline enumeration, III
e30f0b70df is described below

commit e30f0b70df4d1842439f4372d52239fcfa809743
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Tue Apr 5 23:46:01 2022 +0200

    [SYSTEMDS-3342] Fix remote parfor for cleaning pipeline enumeration, III
    
    This patch makes the remaining fixes for successfully running key parts
    of the cleaning pipeline enumeration in remote parfor and thus, fully
    exploiting the full cluster parallelism.
    
    * State reset of loaded functions on parfor worker configuration (when
      sequences of remote parfor loops are executed, the state of loaded
      functions is now properly reset in order to avoid failures on loading
      functions that call functions that have previously been loaded).
    
    * Cleanup of parfor functions dynamically created via eval function
      calls were so far not properly removed from the function dictionaries
      and thus, unnecessarily serialized, transferred to remote parfor jobs,
      and parsed by individual parfor workers. On an end-to-end scenario
      this effected a number of complex functions substantially reducing the
      total parfor execution times:
         -> old: 4  ParFor-ESP                      1,544.583        30
         -> new: 4  ParFor-ESP                      1,401.755        30
---
 .../java/org/apache/sysds/parser/DMLProgram.java    |  4 +++-
 .../runtime/controlprogram/ParForProgramBlock.java  | 21 ++++++++++++---------
 .../sysds/runtime/controlprogram/Program.java       |  4 ++++
 .../controlprogram/context/ExecutionContext.java    | 15 +++++++++++++++
 .../parfor/RemoteParForSparkWorker.java             |  5 ++++-
 .../instructions/cp/EvalNaryCPInstruction.java      |  1 +
 6 files changed, 39 insertions(+), 11 deletions(-)

diff --git a/src/main/java/org/apache/sysds/parser/DMLProgram.java b/src/main/java/org/apache/sysds/parser/DMLProgram.java
index 2edffa71b9..99de236651 100644
--- a/src/main/java/org/apache/sysds/parser/DMLProgram.java
+++ b/src/main/java/org/apache/sysds/parser/DMLProgram.java
@@ -238,7 +238,9 @@ public class DMLProgram
 	}
 	
 	public static String constructFunctionKey(String fnamespace, String fname) {
-		return fnamespace + Program.KEY_DELIM + fname;
+		String sfnamespace = fnamespace == null ?
+			DMLProgram.DEFAULT_NAMESPACE : fnamespace;
+		return sfnamespace + Program.KEY_DELIM + fname;
 	}
 	
 	public static String[] splitFunctionKey(String fkey) {
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index 8abf2e7c1b..cc5eb68892 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -819,14 +819,18 @@ public class ParForProgramBlock extends ForProgramBlock
 				numExecutedIterations, numExecutedTasks, localVariables );
 			
 			// Step 5) cleanup local parworkers (e.g., remove created functions)
-			for( int i=0; i<_numThreads; i++ )
-			{
+			for( int i=0; i<_numThreads; i++ ) {
 				Collection<String> fnNames = workers[i].getFunctionNames();
-				if( fnNames!=null && !fnNames.isEmpty() )
-					for( String fn : fnNames ) {
-						String[] parts = DMLProgram.splitFunctionKey(fn);
-						_prog.removeFunctionProgramBlock(parts[0], parts[1]);
-					}
+				if( fnNames!=null ) 
+					fnNames.stream().map(fn -> DMLProgram.splitFunctionKey(fn))
+						.forEach(p -> _prog.removeFunctionProgramBlock(p[0], p[1]));
+				// also cleanup worker-specific functions created via eval on-demand loading
+				workers[i].getExecutionContext().getTmpParforFunctions().stream()
+					.map(fn -> DMLProgram.splitFunctionKey(fn))
+					.forEach(p -> {
+						_prog.getDMLProg().removeFunctionStatementBlock(p[0], p[1]);
+						_prog.removeFunctionProgramBlock(p[0], p[1]);
+					});
 			}
 
 			// Frees up the GPUContexts used in the threaded Parfor and sets
@@ -835,8 +839,7 @@ public class ParForProgramBlock extends ForProgramBlock
 				ec.getGPUContext(0).initializeThread();
 			}
 		}
-		finally 
-		{
+		finally {
 			//remove thread-local memory budget (reset to original budget)
 			//(in finally to prevent error side effects for multiple scripts in one jvm)
 			resetMemoryBudget();
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/Program.java b/src/main/java/org/apache/sysds/runtime/controlprogram/Program.java
index 66e063a317..806f89318a 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/Program.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/Program.java
@@ -90,6 +90,10 @@ public class Program
 		return getFunctionProgramBlocks(true);
 	}
 	
+	public FunctionDictionary<FunctionProgramBlock> getFunctionProgramBlocks(String nsName) {
+		return _namespaces.get(nsName);
+	}
+	
 	public synchronized HashMap<String,FunctionProgramBlock> getFunctionProgramBlocks(boolean opt){
 		HashMap<String,FunctionProgramBlock> retVal = new HashMap<>();
 		for (Entry<String,FunctionDictionary<FunctionProgramBlock>> namespace : _namespaces.entrySet()){
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java b/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java
index dfad1e14cb..925b34b46d 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java
@@ -63,7 +63,9 @@ import org.apache.sysds.utils.Statistics;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 public class ExecutionContext {
@@ -80,6 +82,9 @@ public class ExecutionContext {
 	//lineage map, cache, prepared dedup blocks
 	protected Lineage _lineage;
 
+	//parfor temporary functions (created by eval)
+	protected Set<String> _fnNames;
+	
 	/**
 	 * List of {@link GPUContext}s owned by this {@link ExecutionContext}
 	 */
@@ -96,6 +101,7 @@ public class ExecutionContext {
 		_autoCreateVars = false;
 		_lineage = allocateLineage ? new Lineage() : null;
 		_prog = prog;
+		_fnNames = new HashSet<>();
 	}
 
 	public ExecutionContext(LocalVariableMap vars) {
@@ -103,6 +109,7 @@ public class ExecutionContext {
 		_autoCreateVars = false;
 		_lineage = null;
 		_prog = null;
+		_fnNames = new HashSet<>();
 	}
 
 	public Program getProgram(){
@@ -872,6 +879,14 @@ public class ExecutionContext {
 	private static String getNonExistingVarError(String varname) {
 		return "Variable '" + varname + "' does not exist in the symbol table.";
 	}
+	
+	public void addTmpParforFunction(String fname) {
+		_fnNames.add(fname);
+	}
+	
+	public Set<String> getTmpParforFunctions() {
+		return _fnNames;
+	}
 
 	@Override
 	public String toString(){
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSparkWorker.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
index 326d2e40d3..57a2aa06fd 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
@@ -35,6 +35,7 @@ import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.util.LongAccumulator;
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.parser.dml.DmlSyntacticValidator;
 import org.apache.sysds.runtime.codegen.CodegenUtils;
 import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
 import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
@@ -150,9 +151,11 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 		if( !_caching && !_isLocal )
 			CacheableData.disableCaching();
 		
-		//ensure local mode for eval function loading on demand
+		//ensure local mode for eval function loading on demand,
+		//and reset thread-local memory of loaded functions (new dictionary)
 		if( !_isLocal )
 			DMLScript.setGlobalExecMode(ExecMode.SINGLE_NODE);
+		DmlSyntacticValidator.init();
 
 		//enable and setup lineage
 		if( _lineage != null ) {
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
index fc6132f09b..6b8f890953 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
@@ -123,6 +123,7 @@ public class EvalNaryCPInstruction extends BuiltinNaryCPInstruction {
 			if( !ec.getProgram().containsFunctionProgramBlock(nsName, funcNameParfor, false) ) { //copy on demand
 				fpb = ProgramConverter.createDeepCopyFunctionProgramBlock(fpb, new HashSet<>(), new HashSet<>(), _threadID);
 				ec.getProgram().addFunctionProgramBlock(nsName, funcNameParfor, fpb, false);
+				ec.addTmpParforFunction(DMLProgram.constructFunctionKey(nsName, funcNameParfor));
 			}
 			fpb = ec.getProgram().getFunctionProgramBlock(nsName, funcNameParfor, false);
 			funcName = funcNameParfor;