You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2022/04/01 23:18:17 UTC

[systemds] branch main updated: [SYSTEMDS-3342] Fix remote parfor for cleaning pipeline enumeration

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new b9be8a3  [SYSTEMDS-3342] Fix remote parfor for cleaning pipeline enumeration
b9be8a3 is described below

commit b9be8a3da3b3d37fc654c9a4b285209646cb6d97
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Sat Apr 2 01:02:12 2022 +0200

    [SYSTEMDS-3342] Fix remote parfor for cleaning pipeline enumeration
    
    This patch makes several improvements to the generality of remote parfor
    jobs especially with regard to frames and eval functional calls and its
    dynamic function compilation on demand.
    
    * Robustness for converting nested parfors (for remote spark)
    * Broadcasting of frame inputs to remote parfor workers
    * Handling of function shipping for remote parfor w/ eval
    * Eval in remote parfor with forced load of function scripts
      and pre-materialized DMLProgram as function dictionary
    * Robustness loading of function call DAGs w/ partial overlap
      with existing functions
    * Forced single node in non-local remote parfor to prevent spark
      context creation when loading and compiling functions in remote parfor
    * Handling of local mode (outdated), now only spark not yarn specific
    * Bandit builtin script with fix for proper indexing
    * Additional tests for misc issues
---
 scripts/builtin/bandit.dml                         |  6 +-
 src/main/java/org/apache/sysds/hops/Hop.java       |  2 +-
 .../apache/sysds/hops/ParameterizedBuiltinOp.java  |  5 +-
 .../org/apache/sysds/parser/StatementBlock.java    |  2 +-
 .../sysds/parser/dml/DmlSyntacticValidator.java    | 12 +++-
 .../runtime/controlprogram/ParForProgramBlock.java |  3 +-
 .../controlprogram/paramserv/SparkPSWorker.java    |  6 +-
 .../controlprogram/parfor/RemoteDPParForSpark.java |  3 +-
 .../parfor/RemoteDPParForSparkWorker.java          | 19 ++++--
 .../controlprogram/parfor/RemoteParForSpark.java   |  6 +-
 .../parfor/RemoteParForSparkWorker.java            | 18 ++++--
 .../controlprogram/parfor/RemoteParForUtils.java   | 28 +++------
 .../instructions/cp/EvalNaryCPInstruction.java     |  2 +-
 .../cp/ParamservBuiltinCPInstruction.java          |  5 +-
 .../sysds/runtime/util/ProgramConverter.java       | 22 +++----
 .../parfor/misc/ParForEvalBuiltinTest.java         | 73 ++++++++++++++++++++++
 .../scripts/functions/parfor/parfor_eval_local.dml | 29 +++++++++
 .../functions/parfor/parfor_eval_remote.dml        | 29 +++++++++
 .../functions/parfor/parfor_eval_remote2.dml       | 38 +++++++++++
 19 files changed, 245 insertions(+), 63 deletions(-)

diff --git a/scripts/builtin/bandit.dml b/scripts/builtin/bandit.dml
index 25c826c..dd5b5d8 100644
--- a/scripts/builtin/bandit.dml
+++ b/scripts/builtin/bandit.dml
@@ -485,8 +485,10 @@ return (Double accuracy, Matrix[Double] evalFunHp, Matrix[Double] hpForPruning,
       [trainX, trainy, testX, testy, Tr, hpForPruning, changesByOp, changesByPip] = executePipeline(pipeline=as.frame(pipList['ph']),
         Xtrain=trainX, Ytrain=trainy, Xtest= testX, Ytest=testy, metaList=metaList, hyperParameters=as.matrix(pipList['hp']), hpForPruning=hpForPruning,
         changesByOp=changesByOp, flagsCount=as.scalar(pipList['flags']), test=TRUE, verbose=FALSE)
-      cvChanges[cvk] = changesByOp
-      allChanges[i] =  changesByPip
+      #TODO double check why this is necessary
+      mincol = min(ncol(cvChanges),ncol(changesByOp))
+      cvChanges[cvk,1:mincol] = changesByOp[,1:mincol];
+      allChanges[i] = changesByPip
     }
     if(changesByPip < ref)
       print("prunning alert 2: no training the model due to minimum changes")
diff --git a/src/main/java/org/apache/sysds/hops/Hop.java b/src/main/java/org/apache/sysds/hops/Hop.java
index af89b4d..344bb30 100644
--- a/src/main/java/org/apache/sysds/hops/Hop.java
+++ b/src/main/java/org/apache/sysds/hops/Hop.java
@@ -1026,7 +1026,7 @@ public abstract class Hop implements ParseInfo {
 	public abstract Lop constructLops();
 
 	protected final ExecType optFindExecType() {
-		return optFindExecType(OptimizerUtils.ALLOW_TRANSITIVE_SPARK_EXEC_TYPE ? true : false);
+		return optFindExecType(OptimizerUtils.ALLOW_TRANSITIVE_SPARK_EXEC_TYPE);
 	}
 	
 	protected abstract ExecType optFindExecType(boolean transitive);
diff --git a/src/main/java/org/apache/sysds/hops/ParameterizedBuiltinOp.java b/src/main/java/org/apache/sysds/hops/ParameterizedBuiltinOp.java
index 7f9d71e..55e6d79 100644
--- a/src/main/java/org/apache/sysds/hops/ParameterizedBuiltinOp.java
+++ b/src/main/java/org/apache/sysds/hops/ParameterizedBuiltinOp.java
@@ -168,19 +168,17 @@ public class ParameterizedBuiltinOp extends MultiThreadedHop {
 		for (Entry<String, Integer> cur : _paramIndexMap.entrySet())
 			inputlops.put(cur.getKey(), getInput().get(cur.getValue()).constructLops());
 
+		ExecType et = optFindExecType();
 		switch( _op ) {
 			case GROUPEDAGG: { 
-				ExecType et = optFindExecType();
 				constructLopsGroupedAggregate(inputlops, et);
 				break;
 			}
 			case RMEMPTY: {
-				ExecType et = optFindExecType();
 				constructLopsRemoveEmpty(inputlops, et);
 				break;
 			} 
 			case REXPAND: {
-				ExecType et = optFindExecType();
 				constructLopsRExpand(inputlops, et);
 				break;
 			} 
@@ -198,7 +196,6 @@ public class ParameterizedBuiltinOp extends MultiThreadedHop {
 			case PARAMSERV:
 			case LIST:
 			case AUTODIFF:{
-				ExecType et = optFindExecType();
 				ParameterizedBuiltin pbilop = new ParameterizedBuiltin(
 					inputlops, _op, getDataType(), getValueType(), et);
 				setOutputDimensions(pbilop);
diff --git a/src/main/java/org/apache/sysds/parser/StatementBlock.java b/src/main/java/org/apache/sysds/parser/StatementBlock.java
index 6e9545c..1604a28 100644
--- a/src/main/java/org/apache/sysds/parser/StatementBlock.java
+++ b/src/main/java/org/apache/sysds/parser/StatementBlock.java
@@ -614,7 +614,7 @@ public class StatementBlock extends LiveVariableAnalysis implements ParseInfo
 				{
 					fdict = prog.createNamespace(DMLProgram.BUILTIN_NAMESPACE);
 					Map<String,FunctionStatementBlock> fsbs = DmlSyntacticValidator
-						.loadAndParseBuiltinFunction(fexpr.getName(), DMLProgram.BUILTIN_NAMESPACE);
+						.loadAndParseBuiltinFunction(fexpr.getName(), DMLProgram.BUILTIN_NAMESPACE, false);
 					for( Entry<String,FunctionStatementBlock> fsb : fsbs.entrySet() ) {
 						if( !fdict.containsFunction(fsb.getKey()) )
 							fdict.addFunction(fsb.getKey(), fsb.getValue());
diff --git a/src/main/java/org/apache/sysds/parser/dml/DmlSyntacticValidator.java b/src/main/java/org/apache/sysds/parser/dml/DmlSyntacticValidator.java
index 8c8a449..cbf61fc 100644
--- a/src/main/java/org/apache/sysds/parser/dml/DmlSyntacticValidator.java
+++ b/src/main/java/org/apache/sysds/parser/dml/DmlSyntacticValidator.java
@@ -638,7 +638,7 @@ public class DmlSyntacticValidator implements DmlListener {
 		}
 	}
 
-	public static Map<String,FunctionStatementBlock> loadAndParseBuiltinFunction(String name, String namespace) {
+	public static Map<String,FunctionStatementBlock> loadAndParseBuiltinFunction(String name, String namespace, boolean forced) {
 		if( !Builtins.contains(name, true, false) ) {
 			throw new DMLRuntimeException("Function "
 				+ DMLProgram.constructFunctionKey(namespace, name)+" is not a builtin function.");
@@ -649,10 +649,12 @@ public class DmlSyntacticValidator implements DmlListener {
 			new CustomErrorListener(), new HashMap<>(), namespace, new HashSet<>());
 		String filePath = Builtins.getFilePath(name);
 		FunctionDictionary<FunctionStatementBlock> dict = tmp
-			.parseAndAddImportedFunctions(namespace, filePath, null)
+			.parseAndAddImportedFunctions(namespace, filePath, null, forced)
 			.getBuiltinFunctionDictionary();
 		
 		//construct output map of all functions
+		if(dict == null)
+			throw new RuntimeException("Failed function load: "+name+" "+namespace);
 		return dict.getFunctions();
 	}
 
@@ -1741,13 +1743,17 @@ public class DmlSyntacticValidator implements DmlListener {
 	}
 	
 	private DMLProgram parseAndAddImportedFunctions(String namespace, String filePath, ParserRuleContext ctx) {
+		return parseAndAddImportedFunctions(namespace, filePath, ctx, false);
+	}
+	
+	private DMLProgram parseAndAddImportedFunctions(String namespace, String filePath, ParserRuleContext ctx, boolean forced) {
 		//validate namespace w/ awareness of dml-bodied builtin functions
 		validateNamespace(namespace, filePath, ctx);
 		
 		//read and parse namespace files
 		String scriptID = DMLProgram.constructFunctionKey(namespace, filePath);
 		DMLProgram prog = null;
-		if (!_f2NS.get().containsKey(scriptID)) {
+		if (forced || !_f2NS.get().containsKey(scriptID) ) {
 			_f2NS.get().put(scriptID, namespace);
 			try {
 				prog = new DMLParserWrapper().doParse(filePath,
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index dc60a79..8abf2e7 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -290,6 +290,7 @@ public class ParForProgramBlock extends ForProgramBlock
 	public static final boolean USE_RANGE_TASKS_IF_USEFUL   = true; // use range tasks whenever size>3, false, otherwise wrong split order in remote 
 	public static final boolean USE_STREAMING_TASK_CREATION = true; // start working while still creating tasks, prevents blocking due to too small task queue
 	public static final boolean ALLOW_NESTED_PARALLELISM    = true; // if not, transparently change parfor to for on program conversions (local,remote)
+	public static final boolean CONVERT_NESTED_REMOTE_PARFOR = true; //convert parfor to for in remote parfor
 	public static final boolean USE_PARALLEL_RESULT_MERGE   = false; // if result merge is run in parallel or serial 
 	public static final boolean USE_PARALLEL_RESULT_MERGE_REMOTE = true; // if remote result merge should be run in parallel for multiple result vars
 	public static final boolean CREATE_UNSCOPED_RESULTVARS  = true;
@@ -1141,7 +1142,7 @@ public class ParForProgramBlock extends ForProgramBlock
 			.map(v -> v._name).collect(Collectors.toSet());
 		Set<String> brVars = inputs.keySet().stream()
 			.filter(v -> !retVars.contains(v))
-			.filter(v -> ec.getVariable(v).getDataType().isMatrix())
+			.filter(v -> ec.getVariable(v).getDataType().isMatrixOrFrame())
 			.filter(v -> OptimizerUtils.estimateSize(ec.getDataCharacteristics(v))< 2.14e9)
 			.collect(Collectors.toSet());
 		return brVars;
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/SparkPSWorker.java b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/SparkPSWorker.java
index 5ae55a3..9e96b45 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/SparkPSWorker.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/SparkPSWorker.java
@@ -41,6 +41,7 @@ public class SparkPSWorker extends LocalPSWorker implements VoidFunction<Tuple2<
 	private static final long serialVersionUID = -8674739573419648732L;
 
 	private final String _program;
+	private final boolean _isLocal;
 	private final HashMap<String, byte[]> _clsMap;
 	private final SparkConf _conf;
 	private final int _port; // rpc port
@@ -54,13 +55,14 @@ public class SparkPSWorker extends LocalPSWorker implements VoidFunction<Tuple2<
 	private final LongAccumulator _nBatches; //number of executed batches
 	private final LongAccumulator _nEpochs; //number of executed epoches
 
-	public SparkPSWorker(String updFunc, String aggFunc, Statement.PSFrequency freq, int epochs, long batchSize, String program, HashMap<String, byte[]> clsMap, SparkConf conf, int port, LongAccumulator aSetup, LongAccumulator aWorker, LongAccumulator aUpdate, LongAccumulator aIndex, LongAccumulator aGrad, LongAccumulator aRPC, LongAccumulator aBatches, LongAccumulator aEpochs, int nbatches, boolean modelAvg) {
+	public SparkPSWorker(String updFunc, String aggFunc, Statement.PSFrequency freq, int epochs, long batchSize, String program, boolean isLocal, HashMap<String, byte[]> clsMap, SparkConf conf, int port, LongAccumulator aSetup, LongAccumulator aWorker, LongAccumulator aUpdate, LongAccumulator aIndex, LongAccumulator aGrad, LongAccumulator aRPC, LongAccumulator aBatches, LongAccumulator aEpochs, int nbatches, boolean modelAvg) {
 		_updFunc = updFunc;
 		_aggFunc = aggFunc;
 		_freq = freq;
 		_epochs = epochs;
 		_batchSize = batchSize;
 		_program = program;
+		_isLocal = isLocal;
 		_clsMap = clsMap;
 		_conf = conf;
 		_port = port;
@@ -105,7 +107,7 @@ public class SparkPSWorker extends LocalPSWorker implements VoidFunction<Tuple2<
 		_ec = body.getEc();
 
 		// Initialize the buffer pool and register it in the jvm shutdown hook in order to be cleanuped at the end
-		RemoteParForUtils.setupBufferPool(_workerID);
+		RemoteParForUtils.setupBufferPool(_workerID, _isLocal);
 
 		// Create the ps proxy
 		_ps = PSRpcFactory.createSparkPSProxy(_conf, _port, _aRPC);
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteDPParForSpark.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteDPParForSpark.java
index 6f01f8c..27a11dd 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteDPParForSpark.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteDPParForSpark.java
@@ -75,6 +75,7 @@ public class RemoteDPParForSpark
 		
 		SparkExecutionContext sec = (SparkExecutionContext)ec;
 		JavaSparkContext sc = sec.getSparkContext();
+		boolean isLocal = sc.isLocal();
 		
 		//prepare input parameters
 		MatrixObject mo = sec.getMatrixObject(matrixvar);
@@ -90,7 +91,7 @@ public class RemoteDPParForSpark
 		int numReducers2 = Math.max(numReducers, Math.min(numParts, (int)dpf.getNumParts(mc)));
 		
 		//core parfor datapartition-execute (w/ or w/o shuffle, depending on data characteristics)
-		RemoteDPParForSparkWorker efun = new RemoteDPParForSparkWorker(program, clsMap, 
+		RemoteDPParForSparkWorker efun = new RemoteDPParForSparkWorker(program, isLocal, clsMap, 
 				matrixvar, itervar, enableCPCaching, mc, tSparseCol, dpf, fmt, aTasks, aIters);
 		JavaPairRDD<Long,Writable> tmp = getPartitionedInput(sec, matrixvar, fmt, dpf);
 		List<Tuple2<Long,String>> out = (requiresGrouping(dpf, mo) ?
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index b4886d2..3526a7c 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.util.LongAccumulator;
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types.ExecMode;
 import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.codegen.CodegenUtils;
@@ -31,7 +33,6 @@ import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PartitionForma
 import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.parfor.Task.TaskType;
-import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysds.runtime.controlprogram.parfor.util.PairWritableBlock;
 import org.apache.sysds.runtime.controlprogram.parfor.util.PairWritableCell;
 import org.apache.sysds.runtime.instructions.cp.IntObject;
@@ -51,6 +52,7 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
 	private static final long serialVersionUID = 30223759283155139L;
 	
 	private final String  _prog;
+	private final boolean _isLocal;
 	private final HashMap<String, byte[]> _clsMap;
 	private final boolean _caching;
 	private final String _inputVar;
@@ -66,11 +68,12 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
 	private final LongAccumulator _aTasks;
 	private final LongAccumulator _aIters;
 	
-	public RemoteDPParForSparkWorker(String program, HashMap<String, byte[]> clsMap, String inputVar, String iterVar,
-		boolean cpCaching, DataCharacteristics mc, boolean tSparseCol, PartitionFormat dpf, FileFormat fmt,
-		LongAccumulator atasks, LongAccumulator aiters)
+	public RemoteDPParForSparkWorker(String program, boolean isLocal, HashMap<String, byte[]> clsMap,
+		String inputVar, String iterVar, boolean cpCaching, DataCharacteristics mc, boolean tSparseCol,
+		PartitionFormat dpf, FileFormat fmt, LongAccumulator atasks, LongAccumulator aiters)
 	{
 		_prog = program;
+		_isLocal = isLocal;
 		_clsMap = clsMap;
 		_caching = cpCaching;
 		_inputVar = inputVar;
@@ -148,14 +151,18 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
 		_numIters    = 0;
 
 		//setup the buffer pool
-		RemoteParForUtils.setupBufferPool(_workerID);
+		RemoteParForUtils.setupBufferPool(_workerID, _isLocal);
 
 		//ensure that resultvar files are not removed
 		super.pinResultVariables();
 		
 		//enable/disable caching (if required and not in CP process)
-		if( !_caching && !InfrastructureAnalyzer.isLocalMode() )
+		if( !_caching && !_isLocal )
 			CacheableData.disableCaching();
+		
+		//ensure local mode for eval function loading on demand
+		if( !_isLocal )
+			DMLScript.setGlobalExecMode(ExecMode.SINGLE_NODE);
 	}
 	
 	/**
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSpark.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSpark.java
index 87c1a8a..3057cb5 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSpark.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSpark.java
@@ -38,7 +38,6 @@ import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
 import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.instructions.cp.ScalarObject;
@@ -74,6 +73,7 @@ public class RemoteParForSpark
 		
 		SparkExecutionContext sec = (SparkExecutionContext)ec;
 		JavaSparkContext sc = sec.getSparkContext();
+		boolean isLocal = sc.isLocal();
 		
 		//initialize accumulators for tasks/iterations
 		LongAccumulator aTasks = sc.sc().longAccumulator("tasks");
@@ -81,7 +81,7 @@ public class RemoteParForSpark
 		
 		//reset cached shared inputs for correctness in local mode
 		long jobid = _jobID.getNextID();
-		if( InfrastructureAnalyzer.isLocalMode() )
+		if( isLocal )
 			RemoteParForSparkWorker.cleanupCachedVariables(jobid);
 
 		// broadcast the inputs except the result variables
@@ -95,7 +95,7 @@ public class RemoteParForSpark
 		//run remote_spark parfor job 
 		//(w/o lazy evaluation to fit existing parfor framework, e.g., result merge)
 		List<Tuple2<Long, String>> out = sc.parallelize(tasks, tasks.size()) //create rdd of parfor tasks
-			.flatMapToPair(new RemoteParForSparkWorker(jobid, prog,
+			.flatMapToPair(new RemoteParForSparkWorker(jobid, prog, isLocal,
 				clsMap, cpCaching, aTasks, aIters, brInputs, topLevelPF, serialLineage))
 			.collect(); //execute and get output handles
 		
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSparkWorker.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
index b002f04..326d2e4 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
@@ -34,10 +34,10 @@ import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.util.LongAccumulator;
 import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types.ExecMode;
 import org.apache.sysds.runtime.codegen.CodegenUtils;
 import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
 import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
-import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysds.runtime.lineage.Lineage;
 import org.apache.sysds.runtime.util.CollectionUtils;
 import org.apache.sysds.runtime.util.ProgramConverter;
@@ -52,6 +52,7 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 	
 	private final long _jobid;
 	private final String _prog;
+	private final boolean _isLocal;
 	private final HashMap<String, byte[]> _clsMap;
 	private boolean _initialized = false;
 	private boolean _caching = true;
@@ -63,12 +64,13 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 
 	private final Map<String, Broadcast<CacheBlock>> _brInputs;
 	
-	public RemoteParForSparkWorker(long jobid, String program, HashMap<String, byte[]> clsMap, boolean cpCaching,
-			LongAccumulator atasks, LongAccumulator aiters, Map<String, Broadcast<CacheBlock>> brInputs, 
-			boolean cleanCache, Map<String,String> lineage) 
+	public RemoteParForSparkWorker(long jobid, String program, boolean isLocal,
+		HashMap<String, byte[]> clsMap, boolean cpCaching, LongAccumulator atasks, LongAccumulator aiters,
+		Map<String, Broadcast<CacheBlock>> brInputs, boolean cleanCache, Map<String,String> lineage) 
 	{
 		_jobid = jobid;
 		_prog = program;
+		_isLocal = isLocal;
 		_clsMap = clsMap;
 		_initialized = false;
 		_caching = cpCaching;
@@ -139,15 +141,19 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 		reuseVars.reuseVariables(_jobid, _ec.getVariables(), excludeList, _brInputs, _cleanCache);
 		
 		//setup the buffer pool
-		RemoteParForUtils.setupBufferPool(_workerID);
+		RemoteParForUtils.setupBufferPool(_workerID, _isLocal);
 
 		//ensure that resultvar files are not removed
 		super.pinResultVariables();
 		
 		//enable/disable caching (if required and not in CP process)
-		if( !_caching && !InfrastructureAnalyzer.isLocalMode() )
+		if( !_caching && !_isLocal )
 			CacheableData.disableCaching();
 		
+		//ensure local mode for eval function loading on demand
+		if( !_isLocal )
+			DMLScript.setGlobalExecMode(ExecMode.SINGLE_NODE);
+
 		//enable and setup lineage
 		if( _lineage != null ) {
 			DMLScript.LINEAGE = true;
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForUtils.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForUtils.java
index b7cce58..33b8284 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForUtils.java
@@ -216,23 +216,13 @@ public class RemoteParForUtils
 	/**
 	 * Cleanup all temporary files created by this SystemDS process.
 	 */
-	public static void cleanupWorkingDirectories()
-	{
-		//use the given job configuration for infrastructure analysis (see configure);
-		//this is important for robustness w/ misconfigured classpath which also contains
-		//core-default.xml and hence hides the actual cluster configuration; otherwise
-		//there is missing cleanup of working directories 
-		JobConf job = ConfigurationManager.getCachedJobConf();
-		
-		if( !InfrastructureAnalyzer.isLocalMode(job) )
-		{
-			//delete cache files
-			CacheableData.cleanupCacheDir();
-			//disable caching (prevent dynamic eviction)
-			CacheableData.disableCaching();
-			//cleanup working dir (e.g., of CP_FILE instructions)
-			LocalFileUtils.cleanupWorkingDirectory();
-		}
+	public static void cleanupWorkingDirectories() {
+		//delete cache files
+		CacheableData.cleanupCacheDir();
+		//disable caching (prevent dynamic eviction)
+		CacheableData.disableCaching();
+		//cleanup working dir (e.g., of CP_FILE instructions)
+		LocalFileUtils.cleanupWorkingDirectory();
 	}
 
 	/**
@@ -294,13 +284,13 @@ public class RemoteParForUtils
 	 * @param workerID worker id
 	 * @throws IOException exception
 	 */
-	public static void setupBufferPool(long workerID) throws IOException {
+	public static void setupBufferPool(long workerID, boolean isLocal) throws IOException {
 		//init and register-cleanup of buffer pool (in spark, multiple tasks might
 		//share the process-local, i.e., per executor, buffer pool; hence we synchronize
 		//the initialization and immediately register the created directory for cleanup
 		//on process exit, i.e., executor exit, including any files created in the future.
 		synchronized(CacheableData.class) {
-			if (!CacheableData.isCachingActive() && !InfrastructureAnalyzer.isLocalMode()) {
+			if (!CacheableData.isCachingActive() && !isLocal) {
 				//create id, executor working dir, and cache dir
 				String uuid = IDHandler.createDistributedUniqueID();
 				LocalFileUtils.createWorkingDirectoryWithUUID(uuid);
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
index 82a13bb..46fbeeb 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
@@ -210,7 +210,7 @@ public class EvalNaryCPInstruction extends BuiltinNaryCPInstruction {
 		//load builtin file and parse function statement block
 		String nsName = DMLProgram.BUILTIN_NAMESPACE;
 		Map<String,FunctionStatementBlock> fsbs = DmlSyntacticValidator
-			.loadAndParseBuiltinFunction(name, nsName);
+			.loadAndParseBuiltinFunction(name, nsName, true); //forced for remote parfor
 		if( fsbs.isEmpty() )
 			throw new DMLRuntimeException("Failed to compile function '"+name+"'.");
 		
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
index 67fa582..25353f6 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
@@ -246,7 +246,8 @@ public class ParamservBuiltinCPInstruction extends ParameterizedBuiltinCPInstruc
 
 		// Get driver host
 		String host = sec.getSparkContext().getConf().get("spark.driver.host");
-
+		boolean isLocal = sec.getSparkContext().isLocal();
+		
 		// Create the netty server for ps
 		TransportServer server = PSRpcFactory.createServer(sec.getSparkContext().getConf(),(LocalParamServer) ps, host); // Start the server
 
@@ -271,7 +272,7 @@ public class ParamservBuiltinCPInstruction extends ParameterizedBuiltinCPInstruc
 
 		// Create remote workers
 		SparkPSWorker worker = new SparkPSWorker(getParam(PS_UPDATE_FUN), getParam(PS_AGGREGATION_FUN),
-			getFrequency(), getEpochs(), getBatchSize(), program, clsMap, sec.getSparkContext().getConf(),
+			getFrequency(), getEpochs(), getBatchSize(), program, isLocal, clsMap, sec.getSparkContext().getConf(),
 			server.getPort(), aSetup, aWorker, aUpdate, aIndex, aGrad, aRPC, aBatch, aEpoch, nbatches, modelAvg);
 
 		if (DMLScript.STATISTICS)
diff --git a/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java b/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
index c892ffb..f4a9b0e 100644
--- a/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
+++ b/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
@@ -77,7 +77,6 @@ import org.apache.sysds.runtime.instructions.Instruction;
 import org.apache.sysds.runtime.instructions.InstructionParser;
 import org.apache.sysds.runtime.instructions.cp.BooleanObject;
 import org.apache.sysds.runtime.instructions.cp.CPInstruction;
-import org.apache.sysds.runtime.instructions.cp.CPOperand;
 import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.instructions.cp.DoubleObject;
 import org.apache.sysds.runtime.instructions.cp.EvalNaryCPInstruction;
@@ -894,13 +893,11 @@ public class ProgramConverter
 						}
 					}
 					else if(inst instanceof EvalNaryCPInstruction) {
-						CPOperand fname = ((EvalNaryCPInstruction)inst).getInputs()[0];
-						if( fname.isLiteral() )
-							cand.add(DMLProgram.constructFunctionKey(DMLProgram.DEFAULT_NAMESPACE, fname.getName()));
-						else //add all potential targets, other than builtin functions
-							pb.getProgram().getFunctionProgramBlocks().keySet().stream()
-								.filter(s -> !s.startsWith(DMLProgram.BUILTIN_NAMESPACE))
-								.forEach(s -> cand.add(s));
+						//add all potential targets, included loaded builtin functions because other 
+						//functions might call them directly (not through eval and thus cannot be loaded)
+						//(even if fname is a known literal, the target function might call other functions)
+						pb.getProgram().getFunctionProgramBlocks().keySet().stream()
+							.forEach(s -> cand.add(s));
 					}
 				}
 			}
@@ -1160,10 +1157,13 @@ public class ProgramConverter
 	private static String rSerializeProgramBlock( ProgramBlock pb, HashMap<String, byte[]> clsMap ) {
 		StringBuilder sb = new StringBuilder();
 		
+		boolean pbFOR = pb instanceof ForProgramBlock 
+			&& (!(pb instanceof ParForProgramBlock) || ParForProgramBlock.CONVERT_NESTED_REMOTE_PARFOR);
+		
 		//handle header
 		if( pb instanceof WhileProgramBlock ) 
 			sb.append(PB_WHILE);
-		else if ( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock) )
+		else if ( pbFOR )
 			sb.append(PB_FOR);
 		else if ( pb instanceof ParForProgramBlock )
 			sb.append(PB_PARFOR);
@@ -1185,7 +1185,7 @@ public class ProgramConverter
 			sb.append( rSerializeProgramBlocks( wpb.getChildBlocks(), clsMap) );
 			sb.append(PBS_END);
 		}
-		else if ( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock ) ) {
+		else if ( pbFOR ) { // might catch parfor too
 			ForProgramBlock fpb = (ForProgramBlock) pb; 
 			sb.append( fpb.getIterVar() );
 			sb.append( COMPONENTS_DELIM );
@@ -1374,7 +1374,7 @@ public class ProgramConverter
 
 	public static Program parseProgram( String in, int id ) {
 		String lin = in.substring( PROG_BEGIN.length(),in.length()- PROG_END.length()).trim();
-		Program prog = new Program();
+		Program prog = new Program(new DMLProgram());
 		parseFunctionProgramBlocks(lin, prog, id);
 		return prog;
 	}
diff --git a/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForEvalBuiltinTest.java b/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForEvalBuiltinTest.java
new file mode 100644
index 0000000..bfe995b
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForEvalBuiltinTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.parfor.misc;
+
+import org.junit.Test;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+
+public class ParForEvalBuiltinTest extends AutomatedTestBase 
+{
+	private final static String TEST_NAME1 = "parfor_eval_local";
+	private final static String TEST_NAME2 = "parfor_eval_remote";
+	private final static String TEST_NAME3 = "parfor_eval_remote2";
+	private final static String TEST_DIR = "functions/parfor/";
+	private final static String TEST_CLASS_DIR = TEST_DIR + ParForEvalBuiltinTest.class.getSimpleName() + "/";
+	
+	private final static int rows = 20;
+	private final static int cols = 10;
+	
+	@Override
+	public void setUp() {
+		addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"Rout"}) );
+		addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {"Rout"}) );
+		addTestConfiguration(TEST_NAME3, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] {"Rout"}) );
+	}
+
+	@Test
+	public void testParForEvalLocal() {
+		runFunctionTest(TEST_NAME1);
+	}
+	
+	@Test
+	public void testParForEvalRemote() {
+		runFunctionTest(TEST_NAME2);
+	}
+	
+	@Test
+	public void testParForEvalRemote2() {
+		runFunctionTest(TEST_NAME3);
+	}
+	
+	private void runFunctionTest( String testName ) {
+		TestConfiguration config = getTestConfiguration(testName);
+		config.addVariable("rows", rows);
+		config.addVariable("cols", cols);
+		loadTestConfiguration(config);
+		
+		String HOME = SCRIPT_DIR + TEST_DIR;
+		fullDMLScriptName = HOME + testName + ".dml";
+		programArgs = new String[]{"-args", 
+			Integer.toString(rows), Integer.toString(cols)};
+
+		//run without errors on function loading
+		runTest(true, false, null, -1);
+	}
+}
diff --git a/src/test/scripts/functions/parfor/parfor_eval_local.dml b/src/test/scripts/functions/parfor/parfor_eval_local.dml
new file mode 100644
index 0000000..bda33a7
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor_eval_local.dml
@@ -0,0 +1,29 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = rand(rows=$1, cols=$2)
+parfor(i in 1:10, opt=CONSTRAINED, mode=LOCAL) {
+  s = "outlierBySd";
+  if( i>5 )
+    s = "outlierByIQR";
+  print(toString(sum(eval(s,
+    list(X=X, k=1.5, repairMethod=1, max_iterations=0, verbose=FALSE)))));
+}
diff --git a/src/test/scripts/functions/parfor/parfor_eval_remote.dml b/src/test/scripts/functions/parfor/parfor_eval_remote.dml
new file mode 100644
index 0000000..c6bf05e
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor_eval_remote.dml
@@ -0,0 +1,29 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = rand(rows=$1, cols=$2)
+parfor(i in 1:10, opt=CONSTRAINED, mode=REMOTE_SPARK) {
+  s = "outlierBySd";
+  if( i>5 )
+    s = "outlierByIQR";
+  print(toString(sum(eval(s,
+    list(X=X, k=1.5, repairMethod=1, max_iterations=0, verbose=FALSE)))));
+}
diff --git a/src/test/scripts/functions/parfor/parfor_eval_remote2.dml b/src/test/scripts/functions/parfor/parfor_eval_remote2.dml
new file mode 100644
index 0000000..cb0c60f
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor_eval_remote2.dml
@@ -0,0 +1,38 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = rand(rows=$1, cols=$2)
+
+parfor(i in 1:10, opt=CONSTRAINED, mode=REMOTE_SPARK) {
+  s = "outlierBySd";
+  if( i>5 )
+    s = "outlierByIQR";
+  print(toString(sum(eval(s,
+    list(X=X, k=1.5, repairMethod=1, max_iterations=0, verbose=FALSE)))));
+}
+
+parfor(i2 in 1:10, opt=CONSTRAINED, mode=REMOTE_SPARK) {
+  s2 = "outlierBySd";
+  if( i2>5 )
+    s2 = "outlierByIQR";
+  print(toString(sum(eval(s2,
+    list(X=X, k=1.5, repairMethod=1, max_iterations=0, verbose=FALSE)))));
+}