You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/06/06 19:26:07 UTC

[systemds] branch master updated: [SYSTEMDS-2953] Fix function serialization for eval in remote parfor

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 85e8c69  [SYSTEMDS-2953] Fix function serialization for eval in remote parfor
85e8c69 is described below

commit 85e8c69848d1cdb312f8d03f5b3b028b1c88e611
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Sun Jun 6 21:17:17 2021 +0200

    [SYSTEMDS-2953] Fix function serialization for eval in remote parfor
    
    This patch fixes the forced recompilation of functions to local CP
    operations if the parfor loop is ran in distributed execution mode
    (e.g., via opt=NONE/CONSTRAINED, mode=REMOTE_SPARK). Normally, we
    recompile reachable functions, but with eval (as a second order
    function) the called functions might be unknown during parfor
    optimization. Accordingly, we now use a conservative approach and
    serialize reachable optimized and unoptimized functions if we encounter
    an eval function in the parfor body or functions reachable from there.
    This patch also introduces the necessary mechanics of sending multiple
    functions with the same name (optimized and unoptimized) to the remote
    sites and better error handling for lazy function loading in eval.
---
 .../apache/sysds/hops/recompile/Recompiler.java    | 45 +++++++++---
 .../runtime/controlprogram/ParForProgramBlock.java | 14 ++--
 .../sysds/runtime/controlprogram/Program.java      |  5 ++
 .../paramserv/FederatedPSControlThread.java        |  7 +-
 .../parfor/opt/OptTreeConverter.java               | 32 ++++-----
 .../instructions/cp/EvalNaryCPInstruction.java     |  8 +++
 .../sysds/runtime/util/ProgramConverter.java       | 83 +++++++++++++---------
 .../builtin/BuiltinNaiveBayesPredictTest.java      | 15 ++--
 .../parfor/misc/ParForRemoteRobustnessTest.java    | 14 +++-
 .../scripts/functions/parfor/parfor_remote3.dml    | 36 ++++++++++
 10 files changed, 181 insertions(+), 78 deletions(-)

diff --git a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
index b1d3f29..77e7aa8 100644
--- a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
+++ b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
@@ -74,6 +74,7 @@ import org.apache.sysds.runtime.controlprogram.FunctionProgramBlock;
 import org.apache.sysds.runtime.controlprogram.IfProgramBlock;
 import org.apache.sysds.runtime.controlprogram.LocalVariableMap;
 import org.apache.sysds.runtime.controlprogram.ParForProgramBlock;
+import org.apache.sysds.runtime.controlprogram.Program;
 import org.apache.sysds.runtime.controlprogram.ProgramBlock;
 import org.apache.sysds.runtime.controlprogram.WhileProgramBlock;
 import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
@@ -84,7 +85,9 @@ import org.apache.sysds.runtime.controlprogram.caching.TensorObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.parfor.opt.OptTreeConverter;
 import org.apache.sysds.runtime.instructions.Instruction;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
 import org.apache.sysds.runtime.instructions.cp.Data;
+import org.apache.sysds.runtime.instructions.cp.EvalNaryCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.FunctionCallCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.IntObject;
 import org.apache.sysds.runtime.instructions.cp.ListObject;
@@ -1017,7 +1020,7 @@ public class Recompiler
 			WhileProgramBlock pbTmp = (WhileProgramBlock)pb;
 			WhileStatementBlock sbTmp = (WhileStatementBlock)pbTmp.getStatementBlock();
 			//recompile predicate
-			if(	sbTmp!=null && !(et==ExecType.CP && !OptTreeConverter.containsSparkInstruction(pbTmp.getPredicate(), true)) )
+			if( sbTmp!=null && !(et==ExecType.CP && !OptTreeConverter.containsSparkInstruction(pbTmp.getPredicate(), true)) )
 				pbTmp.setPredicate( Recompiler.recompileHopsDag2Forced(sbTmp.getPredicateHops(), tid, et) );
 			
 			//recompile body
@@ -1026,7 +1029,7 @@ public class Recompiler
 		}
 		else if (pb instanceof IfProgramBlock)
 		{
-			IfProgramBlock pbTmp = (IfProgramBlock)pb;	
+			IfProgramBlock pbTmp = (IfProgramBlock)pb;
 			IfStatementBlock sbTmp = (IfStatementBlock)pbTmp.getStatementBlock();
 			//recompile predicate
 			if( sbTmp!=null &&!(et==ExecType.CP && !OptTreeConverter.containsSparkInstruction(pbTmp.getPredicate(), true)) )
@@ -1073,24 +1076,46 @@ public class Recompiler
 			if( OptTreeConverter.containsFunctionCallInstruction(bpb) )
 			{
 				ArrayList<Instruction> tmp = bpb.getInstructions();
-				for( Instruction inst : tmp )
+				for( Instruction inst : tmp ) {
 					if( inst instanceof FunctionCallCPInstruction ) {
 						FunctionCallCPInstruction func = (FunctionCallCPInstruction)inst;
 						String fname = func.getFunctionName();
 						String fnamespace = func.getNamespace();
-						String fKey = DMLProgram.constructFunctionKey(fnamespace, fname);
-						
-						if( !fnStack.contains(fKey) ) { //memoization for multiple calls, recursion
-							fnStack.add(fKey);
-							FunctionProgramBlock fpb = pb.getProgram().getFunctionProgramBlock(fnamespace, fname);
-							rRecompileProgramBlock2Forced(fpb, tid, fnStack, et); //recompile chains of functions
+						rRecompileProgramBlock2Forced(fnamespace, fname, pb.getProgram(), tid, fnStack, et);
+					}
+					else if( inst instanceof EvalNaryCPInstruction ) {
+						CPOperand fname = ((EvalNaryCPInstruction)inst).getInputs()[0];
+						if( fname.isLiteral() ) {
+							rRecompileProgramBlock2Forced(DMLProgram.DEFAULT_NAMESPACE,
+								fname.getName(), pb.getProgram(), tid, fnStack, et);
+						}
+						else {
+							for( String fkey : pb.getProgram().getFunctionProgramBlocks().keySet() )
+								if(!fkey.startsWith(DMLProgram.BUILTIN_NAMESPACE)) {
+									String[] parts = DMLProgram.splitFunctionKey(fkey);
+									rRecompileProgramBlock2Forced(
+										parts[0], parts[1], pb.getProgram(), tid, fnStack, et);
+								}
 						}
 					}
+				}
 			}
 		}
-		
 	}
 
+	private static void rRecompileProgramBlock2Forced(String fnamespace, String fname, Program prog, long tid, HashSet<String> fnStack, ExecType et) {
+		String fKey = DMLProgram.constructFunctionKey(fnamespace, fname);
+		if( !fnStack.contains(fKey) ) { //memoization for multiple calls, recursion
+			fnStack.add(fKey);
+			FunctionProgramBlock fpb = prog.getFunctionProgramBlock(fnamespace, fname, true);
+			rRecompileProgramBlock2Forced(fpb, tid, fnStack, et); //recompile chains of functions
+			if( prog.containsFunctionProgramBlock(fnamespace, fname, false) ) {
+				FunctionProgramBlock fpb2 = prog.getFunctionProgramBlock(fnamespace, fname, false);
+				rRecompileProgramBlock2Forced(fpb2, tid, fnStack, et); //recompile chains of functions
+			}
+		}
+	}
+	
 	/**
 	 * Remove any scalar variables from the variable map if the variable
 	 * is updated in this block.
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index 2daa5ee..088f0e9 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -844,9 +844,9 @@ public class ParForProgramBlock extends ForProgramBlock
 		if( FORCE_CP_ON_REMOTE_SPARK && (_optMode == POptMode.NONE 
 			|| (_optMode == POptMode.CONSTRAINED && _execMode==PExecMode.REMOTE_SPARK)) ) {
 			//tid = 0  because replaced in remote parworker
-			flagForced = checkMRAndRecompileToCP(0); 
+			flagForced = checkSparkAndRecompileToCP(0);
 		}
-			
+		
 		// Step 1) init parallel workers (serialize PBs)
 		// NOTES: each mapper changes filenames with regard to his ID as we submit a single 
 		// job, cannot reuse serialized string, since variables are serialized as well.
@@ -905,7 +905,7 @@ public class ParForProgramBlock extends ForProgramBlock
 		Timing time = ( _monitor ? new Timing(true) : null );
 		
 		// Step 0) check and compile to CP (if forced remote parfor)
-		boolean flagForced = checkMRAndRecompileToCP(0);
+		boolean flagForced = checkSparkAndRecompileToCP(0);
 		
 		// Step 1) prepare partitioned input matrix (needs to happen before serializing the program)
 		ParForStatementBlock sb = (ParForStatementBlock) getStatementBlock();
@@ -1346,10 +1346,10 @@ public class ParForProgramBlock extends ForProgramBlock
 	 * @param tid thread id
 	 * @return true if recompile was necessary and possible
 	 */
-	private boolean checkMRAndRecompileToCP(long tid) 
+	private boolean checkSparkAndRecompileToCP(long tid) 
 	{
-		//no MR instructions, ok
-		if( !OptTreeConverter.rContainsMRJobInstruction(this, true) )
+		//no Spark instructions, ok
+		if( !OptTreeConverter.rContainsSparkInstruction(this, true) )
 			return false;
 		
 		//no statement block, failed
@@ -1359,7 +1359,7 @@ public class ParForProgramBlock extends ForProgramBlock
 			return false;
 		}
 		
-		//try recompile MR instructions to CP
+		//try recompile Spark instructions to CP
 		HashSet<String> fnStack = new HashSet<>();
 		Recompiler.recompileProgramBlockHierarchy2Forced(_childBlocks, tid, fnStack, ExecType.CP);
 		return true;
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/Program.java b/src/main/java/org/apache/sysds/runtime/controlprogram/Program.java
index 6792538..dbde741 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/Program.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/Program.java
@@ -57,6 +57,11 @@ public class Program
 		return _prog;
 	}
 	
+	public synchronized void addFunctionProgramBlock(String fkey, FunctionProgramBlock fpb, boolean opt) {
+		String[] parts = DMLProgram.splitFunctionKey(fkey);
+		addFunctionProgramBlock(parts[0], parts[1], fpb, opt);
+	}
+	
 	public synchronized void addFunctionProgramBlock(String namespace, String fname, FunctionProgramBlock fpb) {
 		addFunctionProgramBlock(namespace, fname, fpb, true);
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/FederatedPSControlThread.java b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/FederatedPSControlThread.java
index d286c12..536b529 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/FederatedPSControlThread.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/FederatedPSControlThread.java
@@ -150,11 +150,10 @@ public class FederatedPSControlThread extends PSWorker implements Callable<Void>
 			aggProgramBlock.setInstructions(new ArrayList<>(Collections.singletonList(_ps.getAggInst())));
 			pbs.add(aggProgramBlock);
 		}
-
-		boolean opt = _ec.getProgram().getFunctionProgramBlocks(false).isEmpty();
+		
 		programSerialized = InstructionUtils.concatStrings(
 			PROG_BEGIN, NEWLINE,
-			ProgramConverter.serializeProgram(_ec.getProgram(), pbs, new HashMap<>(), opt),
+			ProgramConverter.serializeProgram(_ec.getProgram(), pbs, new HashMap<>()),
 			PROG_END);
 
 		// write program and meta data to worker
@@ -216,7 +215,7 @@ public class FederatedPSControlThread extends PSWorker implements Callable<Void>
 		@Override
 		public FederatedResponse execute(ExecutionContext ec, Data... data) {
 			// parse and set program
-			ec.setProgram(ProgramConverter.parseProgram(_programString, 0, false));
+			ec.setProgram(ProgramConverter.parseProgram(_programString, 0));
 
 			// set variables to ec
 			ec.setVariable(Statement.PS_FED_BATCH_SIZE, new IntObject(_batchSize));
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreeConverter.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreeConverter.java
index 8fbdcc1..91a703a 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreeConverter.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreeConverter.java
@@ -61,6 +61,7 @@ import org.apache.sysds.runtime.controlprogram.parfor.opt.OptNode.NodeType;
 import org.apache.sysds.runtime.controlprogram.parfor.opt.OptNode.ParamType;
 import org.apache.sysds.runtime.controlprogram.parfor.opt.Optimizer.PlanInputType;
 import org.apache.sysds.runtime.instructions.Instruction;
+import org.apache.sysds.runtime.instructions.cp.EvalNaryCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.FunctionCallCPInstruction;
 import org.apache.sysds.runtime.instructions.cpfile.MatrixIndexingCPFileInstruction;
 import org.apache.sysds.runtime.instructions.spark.SPInstruction;
@@ -539,7 +540,7 @@ public class OptTreeConverter
 		return ret;
 	}
 
-	public static boolean rContainsMRJobInstruction( ProgramBlock pb, boolean inclFunctions )
+	public static boolean rContainsSparkInstruction( ProgramBlock pb, boolean inclFunctions )
 	{
 		boolean ret = false;
 		
@@ -549,7 +550,7 @@ public class OptTreeConverter
 			ret = containsSparkInstruction(tmp.getPredicate(), true);
 			if( ret ) return ret;
 			for (ProgramBlock pb2 : tmp.getChildBlocks()) {
-				ret = rContainsMRJobInstruction(pb2, inclFunctions);
+				ret = rContainsSparkInstruction(pb2, inclFunctions);
 				if( ret ) return ret;
 			}
 		}
@@ -558,11 +559,11 @@ public class OptTreeConverter
 			ret = containsSparkInstruction(tmp.getPredicate(), true);
 			if( ret ) return ret;
 			for( ProgramBlock pb2 : tmp.getChildBlocksIfBody() ){
-				ret = rContainsMRJobInstruction(pb2, inclFunctions);
+				ret = rContainsSparkInstruction(pb2, inclFunctions);
 				if( ret ) return ret;
 			}
 			for( ProgramBlock pb2 : tmp.getChildBlocksElseBody() ){
-				ret = rContainsMRJobInstruction(pb2, inclFunctions);
+				ret = rContainsSparkInstruction(pb2, inclFunctions);
 				if( ret ) return ret;
 			}
 		}
@@ -573,7 +574,7 @@ public class OptTreeConverter
 			ret |= containsSparkInstruction(tmp.getIncrementInstructions(), true);
 			if( ret ) return ret;
 			for( ProgramBlock pb2 : tmp.getChildBlocks() ){
-				ret = rContainsMRJobInstruction(pb2, inclFunctions);
+				ret = rContainsSparkInstruction(pb2, inclFunctions);
 				if( ret ) return ret;
 			}
 		}
@@ -600,17 +601,16 @@ public class OptTreeConverter
 
 	public static boolean containsFunctionCallInstruction( BasicProgramBlock pb ) {
 		return pb.getInstructions().stream()
-			.anyMatch(inst -> inst instanceof FunctionCallCPInstruction);
+			.anyMatch(inst -> inst instanceof FunctionCallCPInstruction
+					|| inst instanceof EvalNaryCPInstruction);
 	}
 
 	public static void replaceProgramBlock(OptNode parent, OptNode n, ProgramBlock pbOld, ProgramBlock pbNew, boolean rtMap) {
 		ProgramBlock pbParent = null;
 		if( rtMap )
 			pbParent = (ProgramBlock)_rtMap.getMappedObject( parent.getID() );
-		else
-		{
-			if( parent.getNodeType()==NodeType.FUNCCALL )
-			{
+		else {
+			if( parent.getNodeType()==NodeType.FUNCCALL ) {
 				FunctionOp fop = (FunctionOp) _hlMap.getMappedHop(parent.getID());
 				pbParent = ((Program)_hlMap.getRootProgram()[1]).getFunctionProgramBlock(fop.getFunctionNamespace(), fop.getFunctionName());
 			}
@@ -618,24 +618,20 @@ public class OptTreeConverter
 				pbParent = (ProgramBlock)_hlMap.getMappedProg( parent.getID() )[1];
 		}
 		
-		if( pbParent instanceof IfProgramBlock )
-		{
+		if( pbParent instanceof IfProgramBlock ) {
 			IfProgramBlock ipb = (IfProgramBlock) pbParent;
 			replaceProgramBlock( ipb.getChildBlocksIfBody(), pbOld, pbNew );
 			replaceProgramBlock( ipb.getChildBlocksElseBody(), pbOld, pbNew );
 		}
-		else if( pbParent instanceof WhileProgramBlock )
-		{
+		else if( pbParent instanceof WhileProgramBlock ) {
 			WhileProgramBlock wpb = (WhileProgramBlock) pbParent;
 			replaceProgramBlock( wpb.getChildBlocks(), pbOld, pbNew );
 		}
-		else if( pbParent instanceof ForProgramBlock || pbParent instanceof ParForProgramBlock )
-		{
+		else if( pbParent instanceof ForProgramBlock || pbParent instanceof ParForProgramBlock ) {
 			ForProgramBlock fpb = (ForProgramBlock) pbParent;
 			replaceProgramBlock( fpb.getChildBlocks(), pbOld, pbNew );
 		}
-		else if( pbParent instanceof FunctionProgramBlock )
-		{
+		else if( pbParent instanceof FunctionProgramBlock ) {
 			FunctionProgramBlock fpb = (FunctionProgramBlock) pbParent;
 			replaceProgramBlock( fpb.getChildBlocks(), pbOld, pbNew );
 		}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
index 7a0e2ac..0970a37 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
@@ -88,6 +88,14 @@ public class EvalNaryCPInstruction extends BuiltinNaryCPInstruction {
 			DataType.MATRIX : boundInputs[0].getDataType();
 		String funcName2 = Builtins.getInternalFName(funcName, dt1);
 		if( !ec.getProgram().containsFunctionProgramBlock(nsName, funcName)) {
+			//error handling non-existing functions
+			if( !Builtins.contains(funcName, true, false) //builtins and their private functions
+				&& !ec.getProgram().containsFunctionProgramBlock(DMLProgram.BUILTIN_NAMESPACE, funcName2)) {
+				String msgNs = (nsName==null) ? DMLProgram.DEFAULT_NAMESPACE : nsName;
+				throw new DMLRuntimeException("Function '" 
+					+ DMLProgram.constructFunctionKey(msgNs, funcName)+"' (called through eval) is non-existing.");
+			}
+			//load and compile missing builtin function
 			nsName = DMLProgram.BUILTIN_NAMESPACE;
 			synchronized(ec.getProgram()) { //prevent concurrent recompile/prog modify
 				if( !ec.getProgram().containsFunctionProgramBlock(nsName, funcName2) )
diff --git a/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java b/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
index fcf93d4..ae27d6c 100644
--- a/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
+++ b/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
@@ -77,6 +77,7 @@ import org.apache.sysds.runtime.instructions.Instruction;
 import org.apache.sysds.runtime.instructions.InstructionParser;
 import org.apache.sysds.runtime.instructions.cp.BooleanObject;
 import org.apache.sysds.runtime.instructions.cp.CPInstruction;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
 import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.instructions.cp.DoubleObject;
 import org.apache.sysds.runtime.instructions.cp.EvalNaryCPInstruction;
@@ -99,7 +100,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.stream.Collectors;
@@ -752,7 +752,7 @@ public class ProgramConverter
 		//handle program
 		builder.append(PROG_BEGIN);
 		builder.append(NEWLINE);
-		builder.append(rSerializeFunctionProgramBlocks(ec.getProgram().getFunctionProgramBlocks(),
+		builder.append(rSerializeFunctionProgramBlocks(ec.getProgram(),
 			new HashSet<>(ec.getProgram().getFunctionProgramBlocks().keySet()), clsMap));
 		builder.append(PROG_END);
 		builder.append(NEWLINE);
@@ -817,7 +817,7 @@ public class ProgramConverter
 		//handle program
 		sb.append(PROG_BEGIN);
 		sb.append( NEWLINE );
-		sb.append( serializeProgram(prog, pbs, clsMap, true) );
+		sb.append( serializeProgram(prog, pbs, clsMap) );
 		sb.append(PROG_END);
 		sb.append( NEWLINE );
 		sb.append( COMPONENTS_DELIM );
@@ -849,46 +849,55 @@ public class ProgramConverter
 		return sb.toString();
 	}
 
-	public static String serializeProgram( Program prog, ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap, boolean opt) {
+	public static String serializeProgram( Program prog, ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap) {
 		//note program contains variables, programblocks and function program blocks
 		//but in order to avoid redundancy, we only serialize function program blocks
-		HashMap<String, FunctionProgramBlock> fpb = prog.getFunctionProgramBlocks(opt);
 		HashSet<String> cand = new HashSet<>();
-		rFindSerializationCandidates(pbs, cand, opt);
-		return rSerializeFunctionProgramBlocks(fpb, cand, clsMap);
+		rFindSerializationCandidates(pbs, cand);
+		return rSerializeFunctionProgramBlocks(prog, cand, clsMap);
 	}
 
-	private static void rFindSerializationCandidates( ArrayList<ProgramBlock> pbs, HashSet<String> cand, boolean opt)
+	private static void rFindSerializationCandidates( ArrayList<ProgramBlock> pbs, HashSet<String> cand)
 	{
 		for( ProgramBlock pb : pbs )
 		{
 			if( pb instanceof WhileProgramBlock ) {
 				WhileProgramBlock wpb = (WhileProgramBlock) pb;
-				rFindSerializationCandidates(wpb.getChildBlocks(), cand, opt);
+				rFindSerializationCandidates(wpb.getChildBlocks(), cand);
 			}
 			else if ( pb instanceof ForProgramBlock || pb instanceof ParForProgramBlock ) {
 				ForProgramBlock fpb = (ForProgramBlock) pb; 
-				rFindSerializationCandidates(fpb.getChildBlocks(), cand, opt);
+				rFindSerializationCandidates(fpb.getChildBlocks(), cand);
 			}
 			else if ( pb instanceof IfProgramBlock ) {
 				IfProgramBlock ipb = (IfProgramBlock) pb;
-				rFindSerializationCandidates(ipb.getChildBlocksIfBody(), cand, opt);
+				rFindSerializationCandidates(ipb.getChildBlocksIfBody(), cand);
 				if( ipb.getChildBlocksElseBody() != null )
-					rFindSerializationCandidates(ipb.getChildBlocksElseBody(), cand, opt);
+					rFindSerializationCandidates(ipb.getChildBlocksElseBody(), cand);
 			}
 			else if( pb instanceof BasicProgramBlock ) { 
 				BasicProgramBlock bpb = (BasicProgramBlock) pb;
-				for( Instruction inst : bpb.getInstructions() )
+				for( Instruction inst : bpb.getInstructions() ) {
 					if( inst instanceof FunctionCallCPInstruction ) {
 						FunctionCallCPInstruction fci = (FunctionCallCPInstruction) inst;
 						String fkey = DMLProgram.constructFunctionKey(fci.getNamespace(), fci.getFunctionName());
 						if( !cand.contains(fkey) ) { //memoization for multiple calls, recursion
 							cand.add( fkey ); //add to candidates
 							//investigate chains of function calls
-							FunctionProgramBlock fpb = pb.getProgram().getFunctionProgramBlock(fci.getNamespace(), fci.getFunctionName(), opt);
-							rFindSerializationCandidates(fpb.getChildBlocks(), cand, opt);
+							FunctionProgramBlock fpb = pb.getProgram().getFunctionProgramBlock(fci.getNamespace(), fci.getFunctionName());
+							rFindSerializationCandidates(fpb.getChildBlocks(), cand);
 						}
 					}
+					else if(inst instanceof EvalNaryCPInstruction) {
+						CPOperand fname = ((EvalNaryCPInstruction)inst).getInputs()[0];
+						if( fname.isLiteral() )
+							cand.add(DMLProgram.constructFunctionKey(DMLProgram.DEFAULT_NAMESPACE, fname.getName()));
+						else //add all potential targets, other than builtin functions
+							pb.getProgram().getFunctionProgramBlocks().keySet().stream()
+								.filter(s -> !s.startsWith(DMLProgram.BUILTIN_NAMESPACE))
+								.forEach(s -> cand.add(s));
+					}
+				}
 			}
 		}
 	}
@@ -1105,18 +1114,24 @@ public class ProgramConverter
 		return sb.toString();
 	}
 
-	private static String rSerializeFunctionProgramBlocks(HashMap<String,FunctionProgramBlock> pbs, HashSet<String> cand, HashMap<String, byte[]> clsMap) {
+	private static String rSerializeFunctionProgramBlocks(Program prog, HashSet<String> cand, HashMap<String, byte[]> clsMap) {
 		StringBuilder sb = new StringBuilder();
 		int count = 0;
-		for( Entry<String,FunctionProgramBlock> pb : pbs.entrySet() ) {
-			if( !cand.contains(pb.getKey()) ) //skip function not included in the parfor body
+		for( String fkey : prog.getFunctionProgramBlocks().keySet() ) {
+			if( !cand.contains(fkey) ) //skip function not included in the parfor body
 				continue;
 			if( count>0 ) {
 				sb.append( ELEMENT_DELIM );
 			}
-			sb.append( pb.getKey() );
+			sb.append( fkey );
 			sb.append( KEY_VALUE_DELIM );
-			sb.append( rSerializeProgramBlock(pb.getValue(), clsMap) );
+			FunctionProgramBlock fpb1 = prog.getFunctionProgramBlock(fkey, true);
+			sb.append( rSerializeProgramBlock(fpb1, clsMap) );
+			if( prog.containsFunctionProgramBlock(fkey, false) ) {
+				sb.append( KEY_VALUE_DELIM );
+				FunctionProgramBlock fpb2 = prog.getFunctionProgramBlock(fkey, false);
+				sb.append( rSerializeProgramBlock(fpb2, clsMap) );
+			}
 			count++;
 		}
 		sb.append(NEWLINE);
@@ -1352,19 +1367,9 @@ public class ProgramConverter
 	}
 
 	public static Program parseProgram( String in, int id ) {
-		return parseProgram(in, id, true);
-	}
-
-	public static Program parseProgram( String in, int id, boolean opt ) {
 		String lin = in.substring( PROG_BEGIN.length(),in.length()- PROG_END.length()).trim();
 		Program prog = new Program();
-		HashMap<String,FunctionProgramBlock> fc = parseFunctionProgramBlocks(lin, prog, id);
-		for( Entry<String,FunctionProgramBlock> e : fc.entrySet() ) {
-			String[] keypart = e.getKey().split( Program.KEY_DELIM );
-			String namespace = keypart[0];
-			String name      = keypart[1];
-			prog.addFunctionProgramBlock(namespace, name, e.getValue(), opt);
-		}
+		parseFunctionProgramBlocks(lin, prog, id);
 		return prog;
 	}
 
@@ -1387,9 +1392,21 @@ public class ProgramConverter
 			String lvar  = st.nextToken(); //with ID = CP_CHILD_THREAD+id for current use
 			//put first copy into prog (for direct use)
 			int index = lvar.indexOf( KEY_VALUE_DELIM );
-			String tmp1 = lvar.substring(0, index); // + CP_CHILD_THREAD+id;
+			String tmp1 = lvar.substring(0, index);
 			String tmp2 = lvar.substring(index + 1);
-			ret.put(tmp1, (FunctionProgramBlock)rParseProgramBlock(tmp2, prog, id));
+			if( tmp2.contains(KEY_VALUE_DELIM) ) {
+				int index2 = tmp2.indexOf( KEY_VALUE_DELIM );
+				String tmp21 = tmp2.substring(0, index2);
+				String tmp22 = tmp2.substring(index2 + 1);
+				prog.addFunctionProgramBlock(tmp1,
+					(FunctionProgramBlock)rParseProgramBlock(tmp21, prog, id), true);
+				prog.addFunctionProgramBlock(tmp1,
+					(FunctionProgramBlock)rParseProgramBlock(tmp22, prog, id), false);
+			}
+			else {
+				prog.addFunctionProgramBlock(tmp1,
+					(FunctionProgramBlock)rParseProgramBlock(tmp2, prog, id), true);
+			}
 		}
 		return ret;
 	}
diff --git a/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinNaiveBayesPredictTest.java b/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinNaiveBayesPredictTest.java
index 9318f45..2357d92 100644
--- a/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinNaiveBayesPredictTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinNaiveBayesPredictTest.java
@@ -37,24 +37,29 @@ public class BuiltinNaiveBayesPredictTest extends AutomatedTestBase {
 
 	public double eps = 1e-7;
 
-	@Override public void setUp() {
+	@Override
+	public void setUp() {
 		TestUtils.clearAssertionInformation();
 		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"YRaw", "Y"}));
 	}
 
-	@Test public void testSmallDense() {
+	@Test
+	public void testSmallDense() {
 		testNaiveBayesPredict(100, 50, 0.7);
 	}
 
-	@Test public void testLargeDense() {
+	@Test
+	public void testLargeDense() {
 		testNaiveBayesPredict(10000, 750, 0.7);
 	}
 
-	@Test public void testSmallSparse() {
+	@Test
+	public void testSmallSparse() {
 		testNaiveBayesPredict(100, 50, 0.01);
 	}
 
-	@Test public void testLargeSparse() {
+	@Test
+	public void testLargeSparse() {
 		testNaiveBayesPredict(10000, 750, 0.01);
 	}
 
diff --git a/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForRemoteRobustnessTest.java b/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForRemoteRobustnessTest.java
index a1599c6..9d9b268 100644
--- a/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForRemoteRobustnessTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForRemoteRobustnessTest.java
@@ -29,10 +29,11 @@ import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
 import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestConfiguration;
 
-public class ParForRemoteRobustnessTest extends AutomatedTestBase 
+public class ParForRemoteRobustnessTest extends AutomatedTestBase
 {
 	private final static String TEST_NAME1 = "parfor_remote1";
 	private final static String TEST_NAME2 = "parfor_remote2";
+	private final static String TEST_NAME3 = "parfor_remote3";
 	private final static String TEST_DIR = "functions/parfor/";
 	private final static String TEST_CLASS_DIR = TEST_DIR + ParForRemoteRobustnessTest.class.getSimpleName() + "/";
 	private final static double eps = 1e-10;
@@ -45,6 +46,7 @@ public class ParForRemoteRobustnessTest extends AutomatedTestBase
 	public void setUp() {
 		addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[]{"Rout"}));
 		addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[]{"Rout"}));
+		addTestConfiguration(TEST_NAME3, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[]{"Rout"}));
 	}
 
 	@Test
@@ -67,6 +69,16 @@ public class ParForRemoteRobustnessTest extends AutomatedTestBase
 		runParforRemoteTest(TEST_NAME2, ExecMode.HYBRID);
 	}
 	
+	@Test
+	public void testParForRemoteEvalCP() {
+		runParforRemoteTest(TEST_NAME3, ExecMode.SINGLE_NODE);
+	}
+	
+	@Test
+	public void testParForRemoteEvalHybrid() {
+		runParforRemoteTest(TEST_NAME3, ExecMode.HYBRID);
+	}
+	
 	private void runParforRemoteTest( String TEST_NAME, ExecMode type )
 	{
 		TestConfiguration config = getTestConfiguration(TEST_NAME);
diff --git a/src/test/scripts/functions/parfor/parfor_remote3.dml b/src/test/scripts/functions/parfor/parfor_remote3.dml
new file mode 100644
index 0000000..48efba6
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor_remote3.dml
@@ -0,0 +1,36 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+foo = function(Matrix[Double] X, Double s) return(Matrix[Double] Y) {
+  while(FALSE) {} #prevent inlining
+  Y = as.matrix(sum(X) + 4*s);
+}
+
+A = rand(rows=$2, cols=$3, min=1, max=1);
+
+R = matrix(0, ncol(A), 1)
+parfor(i in 1:ncol(A), mode=REMOTE_SPARK, opt=CONSTRAINED) {
+  Ai = A[,i];
+  R[i,] = t(eval("foo", list(Ai, sum(Ai))));
+}
+
+R2 = as.matrix(sum(R))
+write(R2, $4);