You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/07/16 05:04:48 UTC

[1/4] systemml git commit: [SYSTEMML-2419] Paramserv spark function shipping and worker setup

Repository: systemml
Updated Branches:
  refs/heads/master 614adecaf -> cffefca30


http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservSparkNNTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservSparkNNTest.java b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservSparkNNTest.java
new file mode 100644
index 0000000..2441116
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservSparkNNTest.java
@@ -0,0 +1,47 @@
+package org.apache.sysml.test.integration.functions.paramserv;
+
+import org.apache.sysml.api.DMLException;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.junit.Test;
+
+public class ParamservSparkNNTest extends AutomatedTestBase {
+
+	private static final String TEST_NAME1 = "paramserv-spark-nn-bsp-batch-dc";
+
+	private static final String TEST_DIR = "functions/paramserv/";
+	private static final String TEST_CLASS_DIR = TEST_DIR + ParamservSparkNNTest.class.getSimpleName() + "/";
+
+	@Override
+	public void setUp() {
+		addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {}));
+	}
+
+	@Test
+	public void testParamservBSPBatchDisjointContiguous() {
+		runDMLTest(TEST_NAME1);
+	}
+
+	private void runDMLTest(String testname) {
+		DMLScript.RUNTIME_PLATFORM oldRtplatform = AutomatedTestBase.rtplatform;
+		boolean oldUseLocalSparkConfig = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		AutomatedTestBase.rtplatform = DMLScript.RUNTIME_PLATFORM.SPARK;
+		DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
+		try {
+			TestConfiguration config = getTestConfiguration(testname);
+			loadTestConfiguration(config);
+			programArgs = new String[] { "-explain" };
+			String HOME = SCRIPT_DIR + TEST_DIR;
+			fullDMLScriptName = HOME + testname + ".dml";
+			// The test is not already finished, so it is normal to have the NPE
+			runTest(true, true, DMLException.class, null, -1);
+		} finally {
+			AutomatedTestBase.rtplatform = oldRtplatform;
+			DMLScript.USE_LOCAL_SPARK_CONFIG = oldUseLocalSparkConfig;
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/java/org/apache/sysml/test/integration/functions/paramserv/SerializationTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/paramserv/SerializationTest.java b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/SerializationTest.java
new file mode 100644
index 0000000..2a08ca6
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/SerializationTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.paramserv;
+
+import java.util.Arrays;
+
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils;
+import org.apache.sysml.runtime.instructions.cp.IntObject;
+import org.apache.sysml.runtime.instructions.cp.ListObject;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.runtime.util.ProgramConverter;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SerializationTest {
+
+	@Test
+	public void serializeUnnamedListObject() {
+		MatrixObject mo1 = generateDummyMatrix(10);
+		MatrixObject mo2 = generateDummyMatrix(20);
+		IntObject io = new IntObject(30);
+		ListObject lo = new ListObject(Arrays.asList(mo1, mo2, io));
+		String serial = ProgramConverter.serializeDataObject("key", lo);
+		Object[] obj = ProgramConverter.parseDataObject(serial);
+		ListObject actualLO = (ListObject) obj[1];
+		MatrixObject actualMO1 = (MatrixObject) actualLO.slice(0);
+		MatrixObject actualMO2 = (MatrixObject) actualLO.slice(1);
+		IntObject actualIO = (IntObject) actualLO.slice(2);
+		Assert.assertArrayEquals(mo1.acquireRead().getDenseBlockValues(), actualMO1.acquireRead().getDenseBlockValues(), 0);
+		Assert.assertArrayEquals(mo2.acquireRead().getDenseBlockValues(), actualMO2.acquireRead().getDenseBlockValues(), 0);
+		Assert.assertEquals(io.getLongValue(), actualIO.getLongValue());
+	}
+
+	@Test
+	public void serializeNamedListObject() {
+		MatrixObject mo1 = generateDummyMatrix(10);
+		MatrixObject mo2 = generateDummyMatrix(20);
+		IntObject io = new IntObject(30);
+		ListObject lo = new ListObject(Arrays.asList(mo1, mo2, io), Arrays.asList("e1", "e2", "e3"));
+
+		String serial = ProgramConverter.serializeDataObject("key", lo);
+		Object[] obj = ProgramConverter.parseDataObject(serial);
+		ListObject actualLO = (ListObject) obj[1];
+		MatrixObject actualMO1 = (MatrixObject) actualLO.slice(0);
+		MatrixObject actualMO2 = (MatrixObject) actualLO.slice(1);
+		IntObject actualIO = (IntObject) actualLO.slice(2);
+		Assert.assertEquals(lo.getNames(), actualLO.getNames());
+		Assert.assertArrayEquals(mo1.acquireRead().getDenseBlockValues(), actualMO1.acquireRead().getDenseBlockValues(), 0);
+		Assert.assertArrayEquals(mo2.acquireRead().getDenseBlockValues(), actualMO2.acquireRead().getDenseBlockValues(), 0);
+		Assert.assertEquals(io.getLongValue(), actualIO.getLongValue());
+	}
+
+	private MatrixObject generateDummyMatrix(int size) {
+		double[] dl = new double[size];
+		for (int i = 0; i < size; i++) {
+			dl[i] = i;
+		}
+		MatrixObject result = ParamservUtils.newMatrixObject(DataConverter.convertToMatrixBlock(dl, true));
+		result.exportData();
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAdversarialLiteralsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAdversarialLiteralsTest.java b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAdversarialLiteralsTest.java
index a4eab06..40f5a1b 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAdversarialLiteralsTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAdversarialLiteralsTest.java
@@ -23,7 +23,7 @@ import java.util.HashMap;
 
 import org.junit.Test;
 
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
+import org.apache.sysml.lops.Lop;
 import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
 import org.apache.sysml.test.integration.AutomatedTestBase;
 import org.apache.sysml.test.integration.TestConfiguration;
@@ -123,7 +123,7 @@ public class ParForAdversarialLiteralsTest extends AutomatedTestBase
 		// This is for running the junit test the new way, i.e., construct the arguments directly 
 		String HOME = SCRIPT_DIR + TEST_DIR;
 		String IN = "A";
-		String OUT = (testName.equals(TEST_NAME1a)||testName.equals(TEST_NAME1b))?ProgramConverter.CP_ROOT_THREAD_ID:"B";
+		String OUT = (testName.equals(TEST_NAME1a)||testName.equals(TEST_NAME1b))?Lop.CP_ROOT_THREAD_ID:"B";
 
 		fullDMLScriptName = HOME + TEST_NAME + ".dml";
 		programArgs = new String[]{"-args", input(IN),
@@ -132,7 +132,7 @@ public class ParForAdversarialLiteralsTest extends AutomatedTestBase
 		fullRScriptName = HOME + TEST_NAME + ".R";
 		rCmd = "Rscript" + " " + fullRScriptName + " " + inputDir() + " " + expectedDir();
 		
-        double[][] A = getRandomMatrix(rows, cols, 0, 1, sparsity, 7);
+		double[][] A = getRandomMatrix(rows, cols, 0, 1, sparsity, 7);
 		writeInputMatrix("A", A, false);
 
 		boolean exceptionExpected = false;
@@ -141,8 +141,7 @@ public class ParForAdversarialLiteralsTest extends AutomatedTestBase
 		//compare matrices
 		HashMap<CellIndex, Double> dmlin = TestUtils.readDMLMatrixFromHDFS(input(IN));
 		HashMap<CellIndex, Double> dmlout = readDMLMatrixFromHDFS(OUT); 
-				
-		TestUtils.compareMatrices(dmlin, dmlout, eps, "DMLin", "DMLout");			
+		
+		TestUtils.compareMatrices(dmlin, dmlout, eps, "DMLin", "DMLout");
 	}
-	
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/scripts/functions/paramserv/mnist_lenet_paramserv.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/paramserv/mnist_lenet_paramserv.dml b/src/test/scripts/functions/paramserv/mnist_lenet_paramserv.dml
index acafc88..84095ec 100644
--- a/src/test/scripts/functions/paramserv/mnist_lenet_paramserv.dml
+++ b/src/test/scripts/functions/paramserv/mnist_lenet_paramserv.dml
@@ -36,7 +36,7 @@ source("nn/optim/sgd_nesterov.dml") as sgd_nesterov
 train = function(matrix[double] X, matrix[double] Y,
                  matrix[double] X_val, matrix[double] Y_val,
                  int C, int Hin, int Win, int epochs, int workers,
-                 string utype, string freq, int batchsize, string scheme)
+                 string utype, string freq, int batchsize, string scheme, string mode)
     return (matrix[double] W1, matrix[double] b1,
             matrix[double] W2, matrix[double] b2,
             matrix[double] W3, matrix[double] b3,
@@ -108,7 +108,7 @@ train = function(matrix[double] X, matrix[double] Y,
   params = list(lr=lr, mu=mu, decay=decay, C=C, Hin=Hin, Win=Win, Hf=Hf, Wf=Wf, stride=stride, pad=pad, lambda=lambda, F1=F1, F2=F2, N3=N3)
 
   # Use paramserv function
-  modelList2 = paramserv(model=modelList, features=X, labels=Y, val_features=X_val, val_labels=Y_val, upd="./src/test/scripts/functions/paramserv/mnist_lenet_paramserv.dml::gradients", agg="./src/test/scripts/functions/paramserv/mnist_lenet_paramserv.dml::aggregation", mode="LOCAL", utype=utype, freq=freq, epochs=epochs, batchsize=batchsize, k=workers, scheme=scheme, hyperparams=params, checkpointing="NONE")
+  modelList2 = paramserv(model=modelList, features=X, labels=Y, val_features=X_val, val_labels=Y_val, upd="./src/test/scripts/functions/paramserv/mnist_lenet_paramserv.dml::gradients", agg="./src/test/scripts/functions/paramserv/mnist_lenet_paramserv.dml::aggregation", mode=mode, utype=utype, freq=freq, epochs=epochs, batchsize=batchsize, k=workers, scheme=scheme, hyperparams=params, checkpointing="NONE")
 
   W1 = as.matrix(modelList2["W1"])
   b1 = as.matrix(modelList2["b1"])

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/scripts/functions/paramserv/paramserv-nn-asp-batch.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/paramserv/paramserv-nn-asp-batch.dml b/src/test/scripts/functions/paramserv/paramserv-nn-asp-batch.dml
index 2279d58..ba22942 100644
--- a/src/test/scripts/functions/paramserv/paramserv-nn-asp-batch.dml
+++ b/src/test/scripts/functions/paramserv/paramserv-nn-asp-batch.dml
@@ -42,7 +42,7 @@ workers = 2
 batchsize = 32
 
 # Train
-[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "ASP", "BATCH", batchsize,"DISJOINT_CONTIGUOUS")
+[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "ASP", "BATCH", batchsize,"DISJOINT_CONTIGUOUS", "LOCAL")
 
 # Compute validation loss & accuracy
 probs_val = mnist_lenet::predict(X_val, C, Hin, Win, batchsize, W1, b1, W2, b2, W3, b3, W4, b4)

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/scripts/functions/paramserv/paramserv-nn-asp-epoch.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/paramserv/paramserv-nn-asp-epoch.dml b/src/test/scripts/functions/paramserv/paramserv-nn-asp-epoch.dml
index 1824083..c8c6a2f 100644
--- a/src/test/scripts/functions/paramserv/paramserv-nn-asp-epoch.dml
+++ b/src/test/scripts/functions/paramserv/paramserv-nn-asp-epoch.dml
@@ -42,7 +42,7 @@ workers = 2
 batchsize = 32
 
 # Train
-[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "ASP", "EPOCH", batchsize, "DISJOINT_CONTIGUOUS")
+[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "ASP", "EPOCH", batchsize, "DISJOINT_CONTIGUOUS", "LOCAL")
 
 # Compute validation loss & accuracy
 probs_val = mnist_lenet::predict(X_val, C, Hin, Win, batchsize, W1, b1, W2, b2, W3, b3, W4, b4)

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-dc.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-dc.dml b/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-dc.dml
index 2e09de4..78fc1c4 100644
--- a/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-dc.dml
+++ b/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-dc.dml
@@ -42,7 +42,7 @@ workers = 2
 batchsize = 32
 
 # Train
-[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "BSP", "BATCH", batchsize, "DISJOINT_CONTIGUOUS")
+[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "BSP", "BATCH", batchsize, "DISJOINT_CONTIGUOUS", "LOCAL")
 
 # Compute validation loss & accuracy
 probs_val = mnist_lenet::predict(X_val, C, Hin, Win, batchsize, W1, b1, W2, b2, W3, b3, W4, b4)

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-dr.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-dr.dml b/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-dr.dml
index 8444952..9191b5a 100644
--- a/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-dr.dml
+++ b/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-dr.dml
@@ -42,7 +42,7 @@ workers = 2
 batchsize = 32
 
 # Train
-[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "BSP", "BATCH", batchsize, "DISJOINT_RANDOM")
+[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "BSP", "BATCH", batchsize, "DISJOINT_RANDOM", "LOCAL")
 
 # Compute validation loss & accuracy
 probs_val = mnist_lenet::predict(X_val, C, Hin, Win, batchsize, W1, b1, W2, b2, W3, b3, W4, b4)

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-drr.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-drr.dml b/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-drr.dml
index ccb7ffc..ec18cb4 100644
--- a/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-drr.dml
+++ b/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-drr.dml
@@ -42,7 +42,7 @@ workers = 4
 batchsize = 32
 
 # Train
-[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "BSP", "BATCH", batchsize, "DISJOINT_ROUND_ROBIN")
+[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "BSP", "BATCH", batchsize, "DISJOINT_ROUND_ROBIN", "LOCAL")
 
 # Compute validation loss & accuracy
 probs_val = mnist_lenet::predict(X_val, C, Hin, Win, batchsize, W1, b1, W2, b2, W3, b3, W4, b4)

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-or.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-or.dml b/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-or.dml
index 4afc56b..928dde2 100644
--- a/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-or.dml
+++ b/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-or.dml
@@ -42,7 +42,7 @@ workers = 2
 batchsize = 32
 
 # Train
-[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "BSP", "BATCH", batchsize, "OVERLAP_RESHUFFLE")
+[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "BSP", "BATCH", batchsize, "OVERLAP_RESHUFFLE", "LOCAL")
 
 # Compute validation loss & accuracy
 probs_val = mnist_lenet::predict(X_val, C, Hin, Win, batchsize, W1, b1, W2, b2, W3, b3, W4, b4)

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/scripts/functions/paramserv/paramserv-nn-bsp-epoch.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/paramserv/paramserv-nn-bsp-epoch.dml b/src/test/scripts/functions/paramserv/paramserv-nn-bsp-epoch.dml
index c542286..8605984 100644
--- a/src/test/scripts/functions/paramserv/paramserv-nn-bsp-epoch.dml
+++ b/src/test/scripts/functions/paramserv/paramserv-nn-bsp-epoch.dml
@@ -42,7 +42,7 @@ workers = 2
 batchsize = 32
 
 # Train
-[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "BSP", "EPOCH", batchsize,"DISJOINT_CONTIGUOUS")
+[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "BSP", "EPOCH", batchsize,"DISJOINT_CONTIGUOUS", "LOCAL")
 
 # Compute validation loss & accuracy
 probs_val = mnist_lenet::predict(X_val, C, Hin, Win, batchsize, W1, b1, W2, b2, W3, b3, W4, b4)

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/scripts/functions/paramserv/paramserv-spark-nn-bsp-batch-dc.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/paramserv/paramserv-spark-nn-bsp-batch-dc.dml b/src/test/scripts/functions/paramserv/paramserv-spark-nn-bsp-batch-dc.dml
new file mode 100644
index 0000000..31d44aa
--- /dev/null
+++ b/src/test/scripts/functions/paramserv/paramserv-spark-nn-bsp-batch-dc.dml
@@ -0,0 +1,53 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+source("src/test/scripts/functions/paramserv/mnist_lenet_paramserv.dml") as mnist_lenet
+source("nn/layers/cross_entropy_loss.dml") as cross_entropy_loss
+
+# Generate the training data
+[images, labels, C, Hin, Win] = mnist_lenet::generate_dummy_data()
+n = nrow(images)
+
+# Generate the training data
+[X, Y, C, Hin, Win] = mnist_lenet::generate_dummy_data()
+
+# Split into training and validation
+val_size = n * 0.1
+X = images[(val_size+1):n,]
+X_val = images[1:val_size,]
+Y = labels[(val_size+1):n,]
+Y_val = labels[1:val_size,]
+
+# Arguments
+epochs = 10
+workers = 2
+batchsize = 16
+
+# Train
+[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "BSP", "BATCH", batchsize, "DISJOINT_CONTIGUOUS", "REMOTE_SPARK")
+
+# Compute validation loss & accuracy
+probs_val = mnist_lenet::predict(X_val, C, Hin, Win, batchsize, W1, b1, W2, b2, W3, b3, W4, b4)
+loss_val = cross_entropy_loss::forward(probs_val, Y_val)
+accuracy_val = mean(rowIndexMax(probs_val) == rowIndexMax(Y_val))
+
+# Output results
+print("Val Loss: " + loss_val + ", Val Accuracy: " + accuracy_val)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test_suites/java/org/apache/sysml/test/integration/functions/paramserv/ZPackageSuite.java
----------------------------------------------------------------------
diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/paramserv/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/paramserv/ZPackageSuite.java
index 26ea638..d1b3a6d 100644
--- a/src/test_suites/java/org/apache/sysml/test/integration/functions/paramserv/ZPackageSuite.java
+++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/paramserv/ZPackageSuite.java
@@ -29,9 +29,10 @@ import org.junit.runners.Suite;
 	LocalDataPartitionerTest.class,
 	SparkDataPartitionerTest.class,
 	ParamservSyntaxTest.class,
+	SerializationTest.class,
 	ParamservRecompilationTest.class,
 	ParamservRuntimeNegativeTest.class,
-	ParamservNNTest.class
+	ParamservLocalNNTest.class
 })
 
 


[2/4] systemml git commit: [SYSTEMML-2419] Paramserv spark function shipping and worker setup

Posted by mb...@apache.org.
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
index 3d61625..4e7a718 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
@@ -39,6 +39,8 @@ import static org.apache.sysml.parser.Statement.PS_UPDATE_TYPE;
 import static org.apache.sysml.parser.Statement.PS_VAL_FEATURES;
 import static org.apache.sysml.parser.Statement.PS_VAL_LABELS;
 
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
@@ -54,6 +56,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.hops.recompile.Recompiler;
+import org.apache.sysml.lops.LopProperties;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
@@ -65,10 +69,12 @@ import org.apache.sysml.runtime.controlprogram.paramserv.LocalPSWorker;
 import org.apache.sysml.runtime.controlprogram.paramserv.LocalParamServer;
 import org.apache.sysml.runtime.controlprogram.paramserv.ParamServer;
 import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils;
+import org.apache.sysml.runtime.controlprogram.paramserv.spark.SparkPSBody;
 import org.apache.sysml.runtime.controlprogram.paramserv.spark.SparkPSWorker;
 import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing;
 import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.util.ProgramConverter;
 import org.apache.sysml.utils.Statistics;
 
 public class ParamservBuiltinCPInstruction extends ParameterizedBuiltinCPInstruction {
@@ -110,13 +116,30 @@ public class ParamservBuiltinCPInstruction extends ParameterizedBuiltinCPInstruc
 	private void runOnSpark(SparkExecutionContext sec, PSModeType mode) {
 		PSScheme scheme = getScheme();
 		int workerNum = getWorkerNum(mode);
+		String updFunc = getParam(PS_UPDATE_FUN);
+		String aggFunc = getParam(PS_AGGREGATION_FUN);
+
+		int k = getParLevel(workerNum);
+
+		// Get the compiled execution context
+		LocalVariableMap newVarsMap = createVarsMap(sec);
+		ExecutionContext newEC = ParamservUtils.createExecutionContext(sec, newVarsMap, updFunc, aggFunc, k);
 
 		MatrixObject features = sec.getMatrixObject(getParam(PS_FEATURES));
 		MatrixObject labels = sec.getMatrixObject(getParam(PS_LABELS));
 
-		SparkPSWorker worker = new SparkPSWorker();
+		// Force all the instructions to CP type
+		Recompiler.recompileProgramBlockHierarchy2Forced(
+			newEC.getProgram().getProgramBlocks(), 0, new HashSet<>(), LopProperties.ExecType.CP);
+		
+		// Serialize all the needed params for remote workers
+		SparkPSBody body = new SparkPSBody(newEC);
+		HashMap<String, byte[]> clsMap = new HashMap<>();
+		String program = ProgramConverter.serializeSparkPSBody(body, clsMap);
+
+		SparkPSWorker worker = new SparkPSWorker(getParam(PS_UPDATE_FUN), getFrequency(), getEpochs(), getBatchSize(), program, clsMap);
 		ParamservUtils.doPartitionOnSpark(sec, features, labels, scheme, workerNum) // Do data partitioning
-				.foreach(worker);   // Run remote workers
+			.foreach(worker);   // Run remote workers
 
 	}
 
@@ -132,15 +155,14 @@ public class ParamservBuiltinCPInstruction extends ParameterizedBuiltinCPInstruc
 		int k = getParLevel(workerNum);
 
 		// Get the compiled execution context
-		// Create workers' execution context
 		LocalVariableMap newVarsMap = createVarsMap(ec);
-		List<ExecutionContext> newECs = ParamservUtils.createExecutionContexts(ec, newVarsMap, updFunc, aggFunc, workerNum, k);
+		ExecutionContext newEC = ParamservUtils.createExecutionContext(ec, newVarsMap, updFunc, aggFunc, k);
 
 		// Create workers' execution context
-		List<ExecutionContext> workerECs = newECs.subList(0, newECs.size() - 1);
+		List<ExecutionContext> workerECs = ParamservUtils.copyExecutionContext(newEC, workerNum);
 
 		// Create the agg service's execution context
-		ExecutionContext aggServiceEC = newECs.get(newECs.size() - 1);
+		ExecutionContext aggServiceEC = ParamservUtils.copyExecutionContext(newEC, 1).get(0);
 
 		PSFrequency freq = getFrequency();
 		PSUpdateType updateType = getUpdateType();

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
index 7bf941d..a1c89a3 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
@@ -40,7 +40,7 @@ import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
+import org.apache.sysml.runtime.util.ProgramConverter;
 import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence;
 import org.apache.sysml.runtime.instructions.Instruction;
 import org.apache.sysml.runtime.instructions.InstructionUtils;

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/util/ProgramConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/ProgramConverter.java b/src/main/java/org/apache/sysml/runtime/util/ProgramConverter.java
new file mode 100644
index 0000000..1d2115e
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/util/ProgramConverter.java
@@ -0,0 +1,1838 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.StringTokenizer;
+import java.util.stream.Collectors;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.conf.CompilerConfig.ConfigType;
+import org.apache.sysml.conf.CompilerConfig;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.conf.DMLConfig;
+import org.apache.sysml.hops.Hop;
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.hops.recompile.Recompiler;
+import org.apache.sysml.lops.Lop;
+import org.apache.sysml.parser.DMLProgram;
+import org.apache.sysml.parser.DataIdentifier;
+import org.apache.sysml.parser.ForStatementBlock;
+import org.apache.sysml.parser.IfStatementBlock;
+import org.apache.sysml.parser.ParForStatementBlock;
+import org.apache.sysml.parser.StatementBlock;
+import org.apache.sysml.parser.Expression.DataType;
+import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.parser.ParForStatementBlock.ResultVar;
+import org.apache.sysml.parser.WhileStatementBlock;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.codegen.CodegenUtils;
+import org.apache.sysml.runtime.controlprogram.ExternalFunctionProgramBlock;
+import org.apache.sysml.runtime.controlprogram.ExternalFunctionProgramBlockCP;
+import org.apache.sysml.runtime.controlprogram.ForProgramBlock;
+import org.apache.sysml.runtime.controlprogram.FunctionProgramBlock;
+import org.apache.sysml.runtime.controlprogram.IfProgramBlock;
+import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
+import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
+import org.apache.sysml.runtime.controlprogram.Program;
+import org.apache.sysml.runtime.controlprogram.ProgramBlock;
+import org.apache.sysml.runtime.controlprogram.WhileProgramBlock;
+import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
+import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PExecMode;
+import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
+import org.apache.sysml.runtime.controlprogram.paramserv.spark.SparkPSBody;
+import org.apache.sysml.runtime.controlprogram.parfor.ParForBody;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.instructions.CPInstructionParser;
+import org.apache.sysml.runtime.instructions.Instruction;
+import org.apache.sysml.runtime.instructions.InstructionParser;
+import org.apache.sysml.runtime.instructions.MRJobInstruction;
+import org.apache.sysml.runtime.instructions.cp.BooleanObject;
+import org.apache.sysml.runtime.instructions.cp.CPInstruction;
+import org.apache.sysml.runtime.instructions.cp.Data;
+import org.apache.sysml.runtime.instructions.cp.DoubleObject;
+import org.apache.sysml.runtime.instructions.cp.FunctionCallCPInstruction;
+import org.apache.sysml.runtime.instructions.cp.IntObject;
+import org.apache.sysml.runtime.instructions.cp.ListObject;
+import org.apache.sysml.runtime.instructions.cp.ScalarObject;
+import org.apache.sysml.runtime.instructions.cp.SpoofCPInstruction;
+import org.apache.sysml.runtime.instructions.cp.StringObject;
+import org.apache.sysml.runtime.instructions.cp.VariableCPInstruction;
+import org.apache.sysml.runtime.instructions.gpu.GPUInstruction;
+import org.apache.sysml.runtime.instructions.mr.MRInstruction;
+import org.apache.sysml.runtime.instructions.spark.SPInstruction;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.MetaDataFormat;
+import org.apache.sysml.runtime.matrix.data.InputInfo;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.udf.ExternalFunctionInvocationInstruction;
+
+/**
+ * Program converter functionalities for 
+ *   (1) creating deep copies of program blocks, instructions, function program blocks, and 
+ *   (2) serializing and parsing of programs, program blocks, functions program blocks.
+ * 
+ */
+//TODO: rewrite class to instance-based invocation (grown gradually and now inappropriate design)
+public class ProgramConverter 
+{
+	protected static final Log LOG = LogFactory.getLog(ProgramConverter.class.getName());
+
+	//use escaped unicodes for separators in order to prevent string conflict
+	public static final String NEWLINE           = "\n"; //System.lineSeparator();
+	public static final String COMPONENTS_DELIM  = "\u236e"; //semicolon w/ bar; ";";
+	public static final String ELEMENT_DELIM     = "\u236a"; //comma w/ bar; ",";
+	public static final String ELEMENT_DELIM2    = ",";
+	public static final String DATA_FIELD_DELIM  = "\u007c"; //"|";
+	public static final String KEY_VALUE_DELIM   = "\u003d"; //"=";
+	public static final String LEVELIN           = "\u23a8"; //variant of left curly bracket; "\u007b"; //"{";
+	public static final String LEVELOUT          = "\u23ac"; //variant of right curly bracket; "\u007d"; //"}";
+	public static final String EMPTY             = "null";
+	public static final String DASH              = "-";
+	public static final String REF               = "ref";
+	public static final String LIST_ELEMENT_DELIM = "\t";
+
+	public static final String CDATA_BEGIN = "<![CDATA[";
+	public static final String CDATA_END = " ]]>";
+	
+	public static final String PROG_BEGIN = " PROG" + LEVELIN;
+	public static final String PROG_END = LEVELOUT;
+	public static final String VARS_BEGIN = "VARS: ";
+	public static final String VARS_END = "";
+	public static final String PBS_BEGIN = " PBS" + LEVELIN;
+	public static final String PBS_END = LEVELOUT;
+	public static final String INST_BEGIN = " INST: ";
+	public static final String INST_END = "";
+	public static final String EC_BEGIN = " EC: ";
+	public static final String EC_END = "";
+	public static final String PB_BEGIN = " PB" + LEVELIN;
+	public static final String PB_END = LEVELOUT;
+	public static final String PB_WHILE = " WHILE" + LEVELIN;
+	public static final String PB_FOR = " FOR" + LEVELIN;
+	public static final String PB_PARFOR = " PARFOR" + LEVELIN;
+	public static final String PB_IF = " IF" + LEVELIN;
+	public static final String PB_FC = " FC" + LEVELIN;
+	public static final String PB_EFC = " EFC" + LEVELIN;
+	
+	public static final String CONF_STATS = "stats";
+
+	// Used for parfor
+	public static final String PARFORBODY_BEGIN  = CDATA_BEGIN + "PARFORBODY" + LEVELIN;
+	public static final String PARFORBODY_END    = LEVELOUT + CDATA_END;
+
+	// Used for paramserv builtin function
+	public static final String PSBODY_BEGIN = CDATA_BEGIN + "PSBODY" + LEVELIN;
+	public static final String PSBODY_END = LEVELOUT + CDATA_END;
+	
+	//exception msgs
+	public static final String NOT_SUPPORTED_EXTERNALFUNCTION_PB = "Not supported: ExternalFunctionProgramBlock contains MR instructions. " +
+			                                                       "(ExternalFunctionPRogramBlockCP can be used)";
+	public static final String NOT_SUPPORTED_MR_INSTRUCTION      = "Not supported: Instructions of type other than CP instructions";
+	public static final String NOT_SUPPORTED_MR_PARFOR           = "Not supported: Nested ParFOR REMOTE_MR due to possible deadlocks." +
+			                                                       "(LOCAL can be used for innner ParFOR)";
+	public static final String NOT_SUPPORTED_PB                  = "Not supported: type of program block";
+	
+	////////////////////////////////
+	// CREATION of DEEP COPIES
+	////////////////////////////////
+	
+	/**
+	 * Creates a deep copy of the given execution context.
+	 * For rt_platform=Hadoop, execution context has a symbol table.
+	 * 
+	 * @param ec execution context
+	 * @return execution context
+	 * @throws CloneNotSupportedException if CloneNotSupportedException occurs
+	 */
+	public static ExecutionContext createDeepCopyExecutionContext(ExecutionContext ec) 
+		throws CloneNotSupportedException
+	{
+		ExecutionContext cpec = ExecutionContextFactory.createContext(false, ec.getProgram());
+		cpec.setVariables((LocalVariableMap) ec.getVariables().clone());
+	
+		//handle result variables with in-place update flag
+		//(each worker requires its own copy of the empty matrix object)
+		for( String var : cpec.getVariables().keySet() ) {
+			Data dat = cpec.getVariables().get(var);
+			if( dat instanceof MatrixObject && ((MatrixObject)dat).getUpdateType().isInPlace() ) {
+				MatrixObject mo = (MatrixObject)dat;
+				MatrixObject moNew = new MatrixObject(mo); 
+				if( mo.getNnz() != 0 ){
+					// If output matrix is not empty (NNZ != 0), then local copy is created so that 
+					// update in place operation can be applied.
+					MatrixBlock mbVar = mo.acquireRead();
+					moNew.acquireModify (new MatrixBlock(mbVar));
+					mo.release();
+				} else {
+					//create empty matrix block w/ dense representation (preferred for update in-place)
+					//Creating a dense matrix block is valid because empty block not allocated and transfer 
+					// to sparse representation happens in left indexing in place operation.
+					moNew.acquireModify(new MatrixBlock((int)mo.getNumRows(), (int)mo.getNumColumns(), false));
+				}
+				moNew.release();
+				cpec.setVariable(var, moNew);
+			}
+		}
+		
+		return cpec;
+	}
+	
+	/**
+	 * This recursively creates a deep copy of program blocks and transparently replaces filenames according to the
+	 * specified parallel worker in order to avoid conflicts between parworkers. This happens recursively in order
+	 * to support arbitrary control-flow constructs within a parfor. 
+	 * 
+	 * @param childBlocks child program blocks
+	 * @param pid ?
+	 * @param IDPrefix ?
+	 * @param fnStack ?
+	 * @param fnCreated ?
+	 * @param plain if true, full deep copy without id replacement
+	 * @param forceDeepCopy if true, force deep copy
+	 * @return list of program blocks
+	 */
+	public static ArrayList<ProgramBlock> rcreateDeepCopyProgramBlocks(ArrayList<ProgramBlock> childBlocks, long pid, int IDPrefix, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) 
+	{
+		ArrayList<ProgramBlock> tmp = new ArrayList<>();
+		
+		for( ProgramBlock pb : childBlocks )
+		{
+			Program prog = pb.getProgram();
+			ProgramBlock tmpPB = null;
+			
+			if( pb instanceof WhileProgramBlock ) {
+				tmpPB = createDeepCopyWhileProgramBlock((WhileProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy);
+			}
+			else if( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock) ) {
+				tmpPB = createDeepCopyForProgramBlock((ForProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy );
+			}
+			else if( pb instanceof ParForProgramBlock ) {
+				ParForProgramBlock pfpb = (ParForProgramBlock) pb;
+				if( ParForProgramBlock.ALLOW_NESTED_PARALLELISM )
+					tmpPB = createDeepCopyParForProgramBlock(pfpb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy);
+				else 
+					tmpPB = createDeepCopyForProgramBlock((ForProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy);
+			}
+			else if( pb instanceof IfProgramBlock ) {
+				tmpPB = createDeepCopyIfProgramBlock((IfProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy);
+			}
+			else { //last-level program block
+				tmpPB = new ProgramBlock(prog); // general case use for most PBs
+				
+				//for recompile in the master node JVM
+				tmpPB.setStatementBlock(createStatementBlockCopy(pb.getStatementBlock(), pid, plain, forceDeepCopy)); 
+				//tmpPB.setStatementBlock(pb.getStatementBlock()); 
+				tmpPB.setThreadID(pid);
+			}
+
+			//copy instructions
+			tmpPB.setInstructions( createDeepCopyInstructionSet(pb.getInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
+			
+			//copy symbol table
+			//tmpPB.setVariables( pb.getVariables() ); //implicit cloning
+			
+			tmp.add(tmpPB);
+		}
+		
+		return tmp;
+	}
+
+	public static WhileProgramBlock createDeepCopyWhileProgramBlock(WhileProgramBlock wpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) {
+		ArrayList<Instruction> predinst = createDeepCopyInstructionSet(wpb.getPredicate(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true);
+		WhileProgramBlock tmpPB = new WhileProgramBlock(prog, predinst);
+		tmpPB.setStatementBlock( createWhileStatementBlockCopy((WhileStatementBlock) wpb.getStatementBlock(), pid, plain, forceDeepCopy) );
+		tmpPB.setThreadID(pid);
+		tmpPB.setExitInstructions2( createDeepCopyInstructionSet(wpb.getExitInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true));
+		tmpPB.setChildBlocks(rcreateDeepCopyProgramBlocks(wpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy));
+		return tmpPB;
+	}
+
+	public static IfProgramBlock createDeepCopyIfProgramBlock(IfProgramBlock ipb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) {
+		ArrayList<Instruction> predinst = createDeepCopyInstructionSet(ipb.getPredicate(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true);
+		IfProgramBlock tmpPB = new IfProgramBlock(prog, predinst);
+		tmpPB.setStatementBlock( createIfStatementBlockCopy((IfStatementBlock)ipb.getStatementBlock(), pid, plain, forceDeepCopy ) );
+		tmpPB.setThreadID(pid);
+		tmpPB.setExitInstructions2( createDeepCopyInstructionSet(ipb.getExitInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true));
+		tmpPB.setChildBlocksIfBody(rcreateDeepCopyProgramBlocks(ipb.getChildBlocksIfBody(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy));
+		tmpPB.setChildBlocksElseBody(rcreateDeepCopyProgramBlocks(ipb.getChildBlocksElseBody(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy));
+		return tmpPB;
+	}
+
+	public static ForProgramBlock createDeepCopyForProgramBlock(ForProgramBlock fpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) {
+		ForProgramBlock tmpPB = new ForProgramBlock(prog,fpb.getIterVar());
+		tmpPB.setStatementBlock( createForStatementBlockCopy((ForStatementBlock)fpb.getStatementBlock(), pid, plain, forceDeepCopy));
+		tmpPB.setThreadID(pid);
+		tmpPB.setFromInstructions( createDeepCopyInstructionSet(fpb.getFromInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
+		tmpPB.setToInstructions( createDeepCopyInstructionSet(fpb.getToInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
+		tmpPB.setIncrementInstructions( createDeepCopyInstructionSet(fpb.getIncrementInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
+		tmpPB.setExitInstructions( createDeepCopyInstructionSet(fpb.getExitInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
+		tmpPB.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy) );
+		return tmpPB;
+	}
+
+	public static ForProgramBlock createShallowCopyForProgramBlock(ForProgramBlock fpb, Program prog ) {
+		ForProgramBlock tmpPB = new ForProgramBlock(prog,fpb.getIterVar());
+		tmpPB.setFromInstructions( fpb.getFromInstructions() );
+		tmpPB.setToInstructions( fpb.getToInstructions() );
+		tmpPB.setIncrementInstructions( fpb.getIncrementInstructions() );
+		tmpPB.setExitInstructions( fpb.getExitInstructions() );
+		tmpPB.setChildBlocks( fpb.getChildBlocks() );
+		return tmpPB;
+	}
+
+	public static ParForProgramBlock createDeepCopyParForProgramBlock(ParForProgramBlock pfpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) {
+		ParForProgramBlock tmpPB = null;
+		
+		if( IDPrefix == -1 ) //still on master node
+			tmpPB = new ParForProgramBlock(prog,pfpb.getIterVar(), pfpb.getParForParams(), pfpb.getResultVariables());
+		else //child of remote ParWorker at any level
+			tmpPB = new ParForProgramBlock(IDPrefix, prog, pfpb.getIterVar(), pfpb.getParForParams(), pfpb.getResultVariables());
+		
+		tmpPB.setStatementBlock( createForStatementBlockCopy( (ForStatementBlock) pfpb.getStatementBlock(), pid, plain, forceDeepCopy) );
+		tmpPB.setThreadID(pid);
+		
+		tmpPB.disableOptimization(); //already done in top-level parfor
+		tmpPB.disableMonitorReport(); //already done in top-level parfor
+		
+		tmpPB.setFromInstructions( createDeepCopyInstructionSet(pfpb.getFromInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
+		tmpPB.setToInstructions( createDeepCopyInstructionSet(pfpb.getToInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
+		tmpPB.setIncrementInstructions( createDeepCopyInstructionSet(pfpb.getIncrementInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
+		tmpPB.setExitInstructions( createDeepCopyInstructionSet(pfpb.getExitInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
+
+		//NOTE: Normally, no recursive copy because (1) copied on each execution in this PB anyway 
+		//and (2) leave placeholders as they are. However, if plain, an explicit deep copy is requested.
+		if( plain || forceDeepCopy )
+			tmpPB.setChildBlocks( rcreateDeepCopyProgramBlocks(pfpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy) ); 
+		else
+			tmpPB.setChildBlocks( pfpb.getChildBlocks() );
+		
+		return tmpPB;
+	}
+	
+	/**
+	 * This creates a deep copy of a function program block. The central reference to singletons of function program blocks
+	 * poses the need for explicit copies in order to prevent conflicting writes of temporary variables (see ExternalFunctionProgramBlock.
+	 * 
+	 * @param namespace function namespace
+	 * @param oldName ?
+	 * @param pid ?
+	 * @param IDPrefix ?
+	 * @param prog runtime program
+	 * @param fnStack ?
+	 * @param fnCreated ?
+	 * @param plain ?
+	 */
+	public static void createDeepCopyFunctionProgramBlock(String namespace, String oldName, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain) 
+	{
+		//fpb guaranteed to be non-null (checked inside getFunctionProgramBlock)
+		FunctionProgramBlock fpb = prog.getFunctionProgramBlock(namespace, oldName);
+		String fnameNew = (plain)? oldName :(oldName+Lop.CP_CHILD_THREAD+pid); 
+		String fnameNewKey = DMLProgram.constructFunctionKey(namespace,fnameNew);
+
+		if( prog.getFunctionProgramBlocks().containsKey(fnameNewKey) )
+			return; //prevent redundant deep copy if already existent
+		
+		//create deep copy
+		FunctionProgramBlock copy = null;
+		ArrayList<DataIdentifier> tmp1 = new ArrayList<>();
+		ArrayList<DataIdentifier> tmp2 = new ArrayList<>();
+		if( fpb.getInputParams()!= null )
+			tmp1.addAll(fpb.getInputParams());
+		if( fpb.getOutputParams()!= null )
+			tmp2.addAll(fpb.getOutputParams());
+		
+		if( fpb instanceof ExternalFunctionProgramBlockCP ) {
+			ExternalFunctionProgramBlockCP efpb = (ExternalFunctionProgramBlockCP) fpb;
+			HashMap<String,String> tmp3 = efpb.getOtherParams();
+			if( IDPrefix!=-1 )
+				copy = new ExternalFunctionProgramBlockCP(prog,tmp1,tmp2,tmp3,saveReplaceFilenameThreadID(efpb.getBaseDir(),Lop.CP_CHILD_THREAD+IDPrefix,Lop.CP_CHILD_THREAD+pid));
+			else
+				copy = new ExternalFunctionProgramBlockCP(prog,tmp1,tmp2,tmp3,saveReplaceFilenameThreadID(efpb.getBaseDir(),Lop.CP_ROOT_THREAD_ID,Lop.CP_CHILD_THREAD+pid));
+		}
+		else if( fpb instanceof ExternalFunctionProgramBlock ) {
+			ExternalFunctionProgramBlock efpb = (ExternalFunctionProgramBlock) fpb;
+			HashMap<String,String> tmp3 = efpb.getOtherParams();
+			if( IDPrefix!=-1 )
+				copy = new ExternalFunctionProgramBlock(prog,tmp1,tmp2,tmp3,saveReplaceFilenameThreadID(efpb.getBaseDir(),Lop.CP_CHILD_THREAD+IDPrefix, Lop.CP_CHILD_THREAD+pid));
+			else
+				copy = new ExternalFunctionProgramBlock(prog,tmp1,tmp2,tmp3,saveReplaceFilenameThreadID(efpb.getBaseDir(),Lop.CP_ROOT_THREAD_ID, Lop.CP_CHILD_THREAD+pid));
+		}
+		else {
+			if( !fnStack.contains(fnameNewKey) ) {
+				fnStack.add(fnameNewKey);
+				copy = new FunctionProgramBlock(prog, tmp1, tmp2);
+				copy.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, fpb.isRecompileOnce()) );
+				copy.setRecompileOnce( fpb.isRecompileOnce() );
+				copy.setThreadID(pid);
+				fnStack.remove(fnameNewKey);
+			}
+			else //stop deep copy for recursive function calls
+				copy = fpb;
+		}
+		
+		//copy.setVariables( (LocalVariableMap) fpb.getVariables() ); //implicit cloning
+		//note: instructions not used by function program block
+		
+		//put 
+		prog.addFunctionProgramBlock(namespace, fnameNew, copy);
+		fnCreated.add(DMLProgram.constructFunctionKey(namespace, fnameNew));
+	}
+
+	public static FunctionProgramBlock createDeepCopyFunctionProgramBlock(FunctionProgramBlock fpb, HashSet<String> fnStack, HashSet<String> fnCreated) 
+	{
+		if( fpb == null )
+			throw new DMLRuntimeException("Unable to create a deep copy of a non-existing FunctionProgramBlock.");
+	
+		//create deep copy
+		FunctionProgramBlock copy = null;
+		ArrayList<DataIdentifier> tmp1 = new ArrayList<>();
+		ArrayList<DataIdentifier> tmp2 = new ArrayList<>();
+		if( fpb.getInputParams()!= null )
+			tmp1.addAll(fpb.getInputParams());
+		if( fpb.getOutputParams()!= null )
+			tmp2.addAll(fpb.getOutputParams());
+		
+		copy = new FunctionProgramBlock(fpb.getProgram(), tmp1, tmp2);
+		copy.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), 0, -1, fnStack, fnCreated, true, fpb.isRecompileOnce()) );
+		copy.setStatementBlock( fpb.getStatementBlock() );
+		copy.setRecompileOnce(fpb.isRecompileOnce());
+		//copy.setVariables( (LocalVariableMap) fpb.getVariables() ); //implicit cloning
+		//note: instructions not used by function program block
+	
+		return copy;
+	}
+
+	
+	/**
+	 * Creates a deep copy of an array of instructions and replaces the placeholders of parworker
+	 * IDs with the concrete IDs of this parfor instance. This is a helper method uses for generating
+	 * deep copies of program blocks.
+	 * 
+	 * @param instSet list of instructions
+	 * @param pid ?
+	 * @param IDPrefix ?
+	 * @param prog runtime program
+	 * @param fnStack ?
+	 * @param fnCreated ?
+	 * @param plain ?
+	 * @param cpFunctions ?
+	 * @return list of instructions
+	 */
+	public static ArrayList<Instruction> createDeepCopyInstructionSet(ArrayList<Instruction> instSet, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean cpFunctions) {
+		ArrayList<Instruction> tmp = new ArrayList<>();
+		for( Instruction inst : instSet ) {
+			if( inst instanceof FunctionCallCPInstruction && cpFunctions ) {
+				FunctionCallCPInstruction finst = (FunctionCallCPInstruction) inst;
+				createDeepCopyFunctionProgramBlock( finst.getNamespace(),
+					finst.getFunctionName(), pid, IDPrefix, prog, fnStack, fnCreated, plain );
+			}
+			tmp.add( cloneInstruction( inst, pid, plain, cpFunctions ) );
+		}
+		return tmp;
+	}
+
+	public static Instruction cloneInstruction( Instruction oInst, long pid, boolean plain, boolean cpFunctions ) 
+	{
+		Instruction inst = null;
+		String tmpString = oInst.toString();
+		
+		try
+		{
+			if( oInst instanceof CPInstruction || oInst instanceof SPInstruction || oInst instanceof MRInstruction 
+				|| oInst instanceof GPUInstruction ) {
+				if( oInst instanceof FunctionCallCPInstruction && cpFunctions ) {
+					FunctionCallCPInstruction tmp = (FunctionCallCPInstruction) oInst;
+					if( !plain ) {
+						//safe replacement because target variables might include the function name
+						//note: this is no update-in-place in order to keep the original function name as basis
+						tmpString = tmp.updateInstStringFunctionName(tmp.getFunctionName(), tmp.getFunctionName() + Lop.CP_CHILD_THREAD+pid);
+					}
+					//otherwise: preserve function name
+				}
+				inst = InstructionParser.parseSingleInstruction(tmpString);
+			}
+			else if( oInst instanceof MRJobInstruction ) {
+				//clone via copy constructor
+				inst = new MRJobInstruction( (MRJobInstruction)oInst );
+			}
+			else
+				throw new DMLRuntimeException("Failed to clone instruction: "+oInst);
+		}
+		catch(Exception ex) {
+			throw new DMLRuntimeException(ex);
+		}
+		
+		//save replacement of thread id references in instructions
+		inst = saveReplaceThreadID( inst, Lop.CP_ROOT_THREAD_ID, Lop.CP_CHILD_THREAD+pid);
+		
+		return inst;
+	}
+
+	public static StatementBlock createStatementBlockCopy( StatementBlock sb, long pid, boolean plain, boolean forceDeepCopy )
+	{
+		StatementBlock ret = null;
+		
+		try
+		{
+			if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) 
+				&& sb != null  //forced deep copy for function recompilation
+				&& (Recompiler.requiresRecompilation( sb.getHops() ) || forceDeepCopy)  )
+			{
+				//create new statement (shallow copy livein/liveout for recompile, line numbers for explain)
+				ret = new StatementBlock();
+				ret.setDMLProg(sb.getDMLProg());
+				ret.setParseInfo(sb);
+				ret.setLiveIn( sb.liveIn() );
+				ret.setLiveOut( sb.liveOut() );
+				ret.setUpdatedVariables( sb.variablesUpdated() );
+				ret.setReadVariables( sb.variablesRead() );
+				
+				//deep copy hops dag for concurrent recompile
+				ArrayList<Hop> hops = Recompiler.deepCopyHopsDag( sb.getHops() );
+				if( !plain )
+					Recompiler.updateFunctionNames( hops, pid );
+				ret.setHops( hops );
+				ret.updateRecompilationFlag();
+			}
+			else {
+				ret = sb;
+			}
+		}
+		catch( Exception ex ) {
+			throw new DMLRuntimeException( ex );
+		}
+		
+		return ret;
+	}
+
+	public static IfStatementBlock createIfStatementBlockCopy( IfStatementBlock sb, long pid, boolean plain, boolean forceDeepCopy ) 
+	{
+		IfStatementBlock ret = null;
+		
+		try
+		{
+			if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) 
+				&& sb != null //forced deep copy for function recompile
+				&& (Recompiler.requiresRecompilation( sb.getPredicateHops() ) || forceDeepCopy)  )
+			{
+				//create new statement (shallow copy livein/liveout for recompile, line numbers for explain)
+				ret = new IfStatementBlock();
+				ret.setDMLProg(sb.getDMLProg());
+				ret.setParseInfo(sb);
+				ret.setLiveIn( sb.liveIn() );
+				ret.setLiveOut( sb.liveOut() );
+				ret.setUpdatedVariables( sb.variablesUpdated() );
+				ret.setReadVariables( sb.variablesRead() );
+				
+				//shallow copy child statements
+				ret.setStatements( sb.getStatements() );
+				
+				//deep copy predicate hops dag for concurrent recompile
+				Hop hops = Recompiler.deepCopyHopsDag( sb.getPredicateHops() );
+				ret.setPredicateHops( hops );
+				ret.updatePredicateRecompilationFlag();
+			}
+			else {
+				ret = sb;
+			}
+		}
+		catch( Exception ex ) {
+			throw new DMLRuntimeException( ex );
+		}
+		
+		return ret;
+	}
+
+	public static WhileStatementBlock createWhileStatementBlockCopy( WhileStatementBlock sb, long pid, boolean plain, boolean forceDeepCopy ) 
+	{
+		WhileStatementBlock ret = null;
+		
+		try
+		{
+			if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) 
+				&& sb != null  //forced deep copy for function recompile
+				&& (Recompiler.requiresRecompilation( sb.getPredicateHops() ) || forceDeepCopy)  )
+			{
+				//create new statement (shallow copy livein/liveout for recompile, line numbers for explain)
+				ret = new WhileStatementBlock();
+				ret.setDMLProg(sb.getDMLProg());
+				ret.setParseInfo(sb);
+				ret.setLiveIn( sb.liveIn() );
+				ret.setLiveOut( sb.liveOut() );
+				ret.setUpdatedVariables( sb.variablesUpdated() );
+				ret.setReadVariables( sb.variablesRead() );
+				ret.setUpdateInPlaceVars( sb.getUpdateInPlaceVars() );
+				
+				//shallow copy child statements
+				ret.setStatements( sb.getStatements() );
+				
+				//deep copy predicate hops dag for concurrent recompile
+				Hop hops = Recompiler.deepCopyHopsDag( sb.getPredicateHops() );
+				ret.setPredicateHops( hops );
+				ret.updatePredicateRecompilationFlag();
+			}
+			else {
+				ret = sb;
+			}
+		}
+		catch( Exception ex ) {
+			throw new DMLRuntimeException( ex );
+		}
+		
+		return ret;
+	}
+
+	public static ForStatementBlock createForStatementBlockCopy( ForStatementBlock sb, long pid, boolean plain, boolean forceDeepCopy ) 
+	{
+		ForStatementBlock ret = null;
+		
+		try
+		{
+			if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) 
+				&& sb != null 
+				&& ( Recompiler.requiresRecompilation(sb.getFromHops()) ||
+					 Recompiler.requiresRecompilation(sb.getToHops()) ||
+					 Recompiler.requiresRecompilation(sb.getIncrementHops()) ||
+					 forceDeepCopy )  )
+			{
+				ret = (sb instanceof ParForStatementBlock) ? new ParForStatementBlock() : new ForStatementBlock();
+				
+				//create new statement (shallow copy livein/liveout for recompile, line numbers for explain)
+				ret.setDMLProg(sb.getDMLProg());
+				ret.setParseInfo(sb);
+				ret.setLiveIn( sb.liveIn() );
+				ret.setLiveOut( sb.liveOut() );
+				ret.setUpdatedVariables( sb.variablesUpdated() );
+				ret.setReadVariables( sb.variablesRead() );
+				ret.setUpdateInPlaceVars( sb.getUpdateInPlaceVars() );
+				
+				//shallow copy child statements
+				ret.setStatements( sb.getStatements() );
+				
+				//deep copy predicate hops dag for concurrent recompile
+				if( sb.requiresFromRecompilation() ){
+					Hop hops = Recompiler.deepCopyHopsDag( sb.getFromHops() );
+					ret.setFromHops( hops );
+				}
+				if( sb.requiresToRecompilation() ){
+					Hop hops = Recompiler.deepCopyHopsDag( sb.getToHops() );
+					ret.setToHops( hops );
+				}
+				if( sb.requiresIncrementRecompilation() ){
+					Hop hops = Recompiler.deepCopyHopsDag( sb.getIncrementHops() );
+					ret.setIncrementHops( hops );
+				}
+				ret.updatePredicateRecompilationFlags();
+			}
+			else {
+				ret = sb;
+			}
+		}
+		catch( Exception ex ) {
+			throw new DMLRuntimeException( ex );
+		}
+		
+		return ret;
+	}
+	
+	
+	////////////////////////////////
+	// SERIALIZATION 
+	////////////////////////////////
+
+	public static String serializeSparkPSBody(SparkPSBody body, HashMap<String, byte[]> clsMap) {
+
+		ExecutionContext ec = body.getEc();
+		StringBuilder builder = new StringBuilder();
+		builder.append(PSBODY_BEGIN);
+		builder.append(NEWLINE);
+
+		//handle DMLScript UUID (propagate original uuid for writing to scratch space)
+		builder.append(DMLScript.getUUID());
+		builder.append(COMPONENTS_DELIM);
+		builder.append(NEWLINE);
+
+		//handle DML config
+		builder.append(ConfigurationManager.getDMLConfig().serializeDMLConfig());
+		builder.append(COMPONENTS_DELIM);
+		builder.append(NEWLINE);
+
+		//handle additional configurations
+		builder.append(CONF_STATS + "=" + DMLScript.STATISTICS);
+		builder.append(COMPONENTS_DELIM);
+		builder.append(NEWLINE);
+
+		//handle program
+		builder.append(PROG_BEGIN);
+		builder.append(NEWLINE);
+		builder.append(rSerializeFunctionProgramBlocks(ec.getProgram().getFunctionProgramBlocks(),
+			new HashSet<>(ec.getProgram().getFunctionProgramBlocks().keySet()), clsMap));
+		builder.append(PROG_END);
+		builder.append(NEWLINE);
+		builder.append(COMPONENTS_DELIM);
+		builder.append(NEWLINE);
+
+		//handle execution context
+		builder.append(EC_BEGIN);
+		builder.append(serializeExecutionContext(ec));
+		builder.append(EC_END);
+		builder.append(NEWLINE);
+		builder.append(COMPONENTS_DELIM);
+		builder.append(NEWLINE);
+
+		//handle program blocks
+		builder.append(PBS_BEGIN);
+		builder.append(NEWLINE);
+		builder.append(rSerializeProgramBlocks(ec.getProgram().getProgramBlocks(), clsMap));
+		builder.append(PBS_END);
+		builder.append(NEWLINE);
+
+		builder.append(PSBODY_END);
+
+		return builder.toString();
+	}
+
+	public static String serializeParForBody( ParForBody body ) {
+		return serializeParForBody(body, new HashMap<String, byte[]>());
+	}	
+	
+	public static String serializeParForBody( ParForBody body, HashMap<String,byte[]> clsMap ) 
+	{
+		ArrayList<ProgramBlock> pbs = body.getChildBlocks();
+		ArrayList<ResultVar> rVnames = body.getResultVariables();
+		ExecutionContext ec = body.getEc();
+		
+		if( pbs.isEmpty() )
+			return PARFORBODY_BEGIN + PARFORBODY_END;
+		
+		Program prog = pbs.get( 0 ).getProgram();
+		
+		StringBuilder sb = new StringBuilder();
+		sb.append( PARFORBODY_BEGIN );
+		sb.append( NEWLINE );
+		
+		//handle DMLScript UUID (propagate original uuid for writing to scratch space)
+		sb.append( DMLScript.getUUID() );
+		sb.append( COMPONENTS_DELIM );
+		sb.append( NEWLINE );
+		
+		//handle DML config
+		sb.append( ConfigurationManager.getDMLConfig().serializeDMLConfig() );
+		sb.append( COMPONENTS_DELIM );
+		sb.append( NEWLINE );
+		
+		//handle additional configurations
+		sb.append( CONF_STATS + "=" + DMLScript.STATISTICS );
+		sb.append( COMPONENTS_DELIM );
+		sb.append( NEWLINE );
+		
+		//handle program
+		sb.append(PROG_BEGIN);
+		sb.append( NEWLINE );
+		sb.append( serializeProgram(prog, pbs, clsMap) );
+		sb.append(PROG_END);
+		sb.append( NEWLINE );
+		sb.append( COMPONENTS_DELIM );
+		sb.append( NEWLINE );
+		
+		//handle result variable names
+		sb.append( serializeResultVariables(rVnames) );
+		sb.append( COMPONENTS_DELIM );
+		
+		//handle execution context
+		//note: this includes also the symbol table (serialize only the top-level variable map,
+		//      (symbol tables for nested/child blocks are created at parse time, on the remote side)
+		sb.append(EC_BEGIN);
+		sb.append( serializeExecutionContext(ec) );
+		sb.append(EC_END);
+		sb.append( NEWLINE );
+		sb.append( COMPONENTS_DELIM );
+		sb.append( NEWLINE );
+		
+		//handle program blocks
+		sb.append(PBS_BEGIN);
+		sb.append( NEWLINE );
+		sb.append( rSerializeProgramBlocks(pbs, clsMap) );
+		sb.append(PBS_END);
+		sb.append( NEWLINE );
+		
+		sb.append( PARFORBODY_END );
+		
+		return sb.toString();
+	}
+
+	private static String serializeProgram( Program prog, ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap ) {
+		//note program contains variables, programblocks and function program blocks 
+		//but in order to avoid redundancy, we only serialize function program blocks
+		HashMap<String, FunctionProgramBlock> fpb = prog.getFunctionProgramBlocks();
+		HashSet<String> cand = new HashSet<>();
+		rFindSerializationCandidates(pbs, cand);
+		return rSerializeFunctionProgramBlocks( fpb, cand, clsMap );
+	}
+
+	private static void rFindSerializationCandidates( ArrayList<ProgramBlock> pbs, HashSet<String> cand ) 
+	{
+		for( ProgramBlock pb : pbs )
+		{
+			if( pb instanceof WhileProgramBlock ) {
+				WhileProgramBlock wpb = (WhileProgramBlock) pb;
+				rFindSerializationCandidates(wpb.getChildBlocks(), cand );
+			}
+			else if ( pb instanceof ForProgramBlock || pb instanceof ParForProgramBlock ) {
+				ForProgramBlock fpb = (ForProgramBlock) pb; 
+				rFindSerializationCandidates(fpb.getChildBlocks(), cand);
+			}
+			else if ( pb instanceof IfProgramBlock ) {
+				IfProgramBlock ipb = (IfProgramBlock) pb;
+				rFindSerializationCandidates(ipb.getChildBlocksIfBody(), cand);
+				if( ipb.getChildBlocksElseBody() != null )
+					rFindSerializationCandidates(ipb.getChildBlocksElseBody(), cand);
+			}
+			else { //all generic program blocks
+				for( Instruction inst : pb.getInstructions() )
+					if( inst instanceof FunctionCallCPInstruction ) {
+						FunctionCallCPInstruction fci = (FunctionCallCPInstruction) inst;
+						String fkey = DMLProgram.constructFunctionKey(fci.getNamespace(), fci.getFunctionName());
+						if( !cand.contains(fkey) ) { //memoization for multiple calls, recursion
+							cand.add( fkey ); //add to candidates
+							//investigate chains of function calls
+							FunctionProgramBlock fpb = pb.getProgram().getFunctionProgramBlock(fci.getNamespace(), fci.getFunctionName());
+							rFindSerializationCandidates(fpb.getChildBlocks(), cand);
+						}
+					}
+			}
+		}
+	}
+
+	private static String serializeVariables (LocalVariableMap vars) {
+		StringBuilder sb = new StringBuilder();
+		sb.append(VARS_BEGIN);
+		sb.append( vars.serialize() );
+		sb.append(VARS_END);
+		return sb.toString();
+	}
+
+	public static String serializeDataObject(String key, Data dat) 
+	{
+		// SCHEMA: <name>|<datatype>|<valuetype>|value
+		// (scalars are serialize by value, matrices by filename)
+		StringBuilder sb = new StringBuilder();
+		//prepare data for serialization
+		String name = key;
+		DataType datatype = dat.getDataType();
+		ValueType valuetype = dat.getValueType();
+		String value = null;
+		String[] metaData = null;
+		String[] listData = null;
+		switch( datatype )
+		{
+			case SCALAR:
+				ScalarObject so = (ScalarObject) dat;
+				//name = so.getName();
+				value = so.getStringValue();
+				break;
+			case MATRIX:
+				MatrixObject mo = (MatrixObject) dat;
+				MetaDataFormat md = (MetaDataFormat) dat.getMetaData();
+				MatrixCharacteristics mc = md.getMatrixCharacteristics();
+				value = mo.getFileName();
+				PartitionFormat partFormat = (mo.getPartitionFormat()!=null) ? new PartitionFormat(
+						mo.getPartitionFormat(),mo.getPartitionSize()) : PartitionFormat.NONE;
+				metaData = new String[9];
+				metaData[0] = String.valueOf( mc.getRows() );
+				metaData[1] = String.valueOf( mc.getCols() );
+				metaData[2] = String.valueOf( mc.getRowsPerBlock() );
+				metaData[3] = String.valueOf( mc.getColsPerBlock() );
+				metaData[4] = String.valueOf( mc.getNonZeros() );
+				metaData[5] = InputInfo.inputInfoToString( md.getInputInfo() );
+				metaData[6] = OutputInfo.outputInfoToString( md.getOutputInfo() );
+				metaData[7] = String.valueOf( partFormat );
+				metaData[8] = String.valueOf( mo.getUpdateType() );
+				break;
+			case LIST:
+				// SCHEMA: <name>|<datatype>|<valuetype>|value|<metadata>|<tab>element1<tab>element2<tab>element3 (this is the list)
+				//         (for the element1) <listName-index>|<datatype>|<valuetype>|value
+				//         (for the element2) <listName-index>|<datatype>|<valuetype>|value
+				ListObject lo = (ListObject) dat;
+				value = REF;
+				metaData = new String[2];
+				metaData[0] = String.valueOf(lo.getLength());
+				metaData[1] = lo.getNames() == null ? EMPTY : serializeList(lo.getNames(), ELEMENT_DELIM2);
+				listData = new String[lo.getLength()];
+				for (int index = 0; index < lo.getLength(); index++) {
+					listData[index] = serializeDataObject(name + DASH + index, lo.slice(index));
+				}
+				break;
+			default:
+				throw new DMLRuntimeException("Unable to serialize datatype "+datatype);
+		}
+		
+		//serialize data
+		sb.append(name);
+		sb.append(DATA_FIELD_DELIM);
+		sb.append(datatype);
+		sb.append(DATA_FIELD_DELIM);
+		sb.append(valuetype);
+		sb.append(DATA_FIELD_DELIM);
+		sb.append(value);
+		if( metaData != null )
+			for( int i=0; i<metaData.length; i++ ) {
+				sb.append(DATA_FIELD_DELIM);
+				sb.append(metaData[i]);
+			}
+		if (listData != null) {
+			sb.append(DATA_FIELD_DELIM);
+			for (String ld : listData) {
+				sb.append(LIST_ELEMENT_DELIM);
+				sb.append(ld);
+			}
+		}
+		
+		return sb.toString();
+	}
+
+	private static String serializeExecutionContext( ExecutionContext ec ) {
+		return (ec != null) ? serializeVariables( ec.getVariables() ) : EMPTY;
+	}
+
+	@SuppressWarnings("all")
+	private static String serializeInstructions( ArrayList<Instruction> inst, HashMap<String, byte[]> clsMap ) 
+	{
+		StringBuilder sb = new StringBuilder();
+		int count = 0;
+		for( Instruction linst : inst ) {
+			//check that only cp instruction are transmitted 
+			if( !( linst instanceof CPInstruction || linst instanceof ExternalFunctionInvocationInstruction ) )
+				throw new DMLRuntimeException( NOT_SUPPORTED_MR_INSTRUCTION + " " +linst.getClass().getName()+"\n"+linst );
+			
+			//obtain serialized version of generated classes
+			if( linst instanceof SpoofCPInstruction ) {
+				Class<?> cla = ((SpoofCPInstruction) linst).getOperatorClass();
+				clsMap.put(cla.getName(), CodegenUtils.getClassData(cla.getName()));
+			}
+			
+			if( count > 0 )
+				sb.append( ELEMENT_DELIM );
+			
+			sb.append( checkAndReplaceLiterals( linst.toString() ) );
+			count++;
+		}
+		
+		return sb.toString();
+	}
+	
+	/**
+	 * Replacement of internal delimiters occurring in literals of instructions
+	 * in order to ensure robustness of serialization and parsing.
+	 * (e.g. print( "a,b" ) would break the parsing of instruction that internally
+	 * are separated with a "," )
+	 * 
+	 * @param instStr instruction string
+	 * @return instruction string with replacements
+	 */
+	private static String checkAndReplaceLiterals( String instStr )
+	{
+		String tmp = instStr;
+		
+		//1) check own delimiters (very unlikely due to special characters)
+		if( tmp.contains(COMPONENTS_DELIM) ) {
+			tmp = tmp.replaceAll(COMPONENTS_DELIM, ".");
+			LOG.warn("Replaced special literal character sequence "+COMPONENTS_DELIM+" with '.'");
+		}
+		
+		if( tmp.contains(ELEMENT_DELIM) ) {
+			tmp = tmp.replaceAll(ELEMENT_DELIM, ".");
+			LOG.warn("Replaced special literal character sequence "+ELEMENT_DELIM+" with '.'");
+		}
+			
+		if( tmp.contains( LEVELIN ) ){
+			tmp = tmp.replaceAll(LEVELIN, "("); // '\\' required if LEVELIN='{' because regex
+			LOG.warn("Replaced special literal character sequence "+LEVELIN+" with '('");
+		}
+
+		if( tmp.contains(LEVELOUT) ){
+			tmp = tmp.replaceAll(LEVELOUT, ")");
+			LOG.warn("Replaced special literal character sequence "+LEVELOUT+" with ')'");
+		}
+		
+		//NOTE: DATA_FIELD_DELIM and KEY_VALUE_DELIM not required
+		//because those literals cannot occur in critical places.
+		
+		//2) check end tag of CDATA
+		if( tmp.contains(CDATA_END) ){
+			tmp = tmp.replaceAll(CDATA_END, "."); //prevent XML parsing issues in job.xml
+			LOG.warn("Replaced special literal character sequence "+ CDATA_END +" with '.'");
+		}
+		
+		return tmp;
+	}
+
+	private static String serializeStringHashMap(HashMap<String,String> vars) {
+		return serializeList(vars.entrySet().stream().map(e ->
+			e.getKey()+KEY_VALUE_DELIM+e.getValue()).collect(Collectors.toList()));
+	}
+
+	public static String serializeResultVariables( List<ResultVar> vars) {
+		return serializeList(vars.stream().map(v -> v._isAccum ?
+			v._name+"+" : v._name).collect(Collectors.toList()));
+	}
+	
+	public static String serializeList(List<String> elements) {
+		return serializeList(elements, ELEMENT_DELIM);
+	}
+	
+	public static String serializeList(List<String> elements, String delim) {
+		return StringUtils.join(elements, delim);
+	}
+
+	private static String serializeDataIdentifiers(List<DataIdentifier> vars) {
+		return serializeList(vars.stream().map(v ->
+			serializeDataIdentifier(v)).collect(Collectors.toList()));
+	}
+
+	private static String serializeDataIdentifier( DataIdentifier dat ) {
+		// SCHEMA: <name>|<datatype>|<valuetype>
+		StringBuilder sb = new StringBuilder();
+		sb.append(dat.getName());
+		sb.append(DATA_FIELD_DELIM);
+		sb.append(dat.getDataType());
+		sb.append(DATA_FIELD_DELIM);
+		sb.append(dat.getValueType());
+		return sb.toString();
+	}
+
+	private static String rSerializeFunctionProgramBlocks(HashMap<String,FunctionProgramBlock> pbs, HashSet<String> cand, HashMap<String, byte[]> clsMap) {
+		StringBuilder sb = new StringBuilder();
+		int count = 0;
+		for( Entry<String,FunctionProgramBlock> pb : pbs.entrySet() ) {
+			if( !cand.contains(pb.getKey()) ) //skip function not included in the parfor body
+				continue;
+			if( count>0 ) {
+				sb.append( ELEMENT_DELIM );
+				sb.append( NEWLINE );
+			}
+			sb.append( pb.getKey() );
+			sb.append( KEY_VALUE_DELIM );
+			sb.append( rSerializeProgramBlock(pb.getValue(), clsMap) );
+			count++;
+		}
+		sb.append(NEWLINE);
+		return sb.toString();
+	}
+
+	private static String rSerializeProgramBlocks(ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap) {
+		StringBuilder sb = new StringBuilder();
+		int count = 0;
+		for( ProgramBlock pb : pbs ) {
+			if( count>0 ) {
+				sb.append( ELEMENT_DELIM );
+				sb.append(NEWLINE);
+			}
+			sb.append( rSerializeProgramBlock(pb, clsMap) );
+			count++;
+		}
+		return sb.toString();
+	}
+
+	private static String rSerializeProgramBlock( ProgramBlock pb, HashMap<String, byte[]> clsMap ) {
+		StringBuilder sb = new StringBuilder();
+		
+		//handle header
+		if( pb instanceof WhileProgramBlock ) 
+			sb.append(PB_WHILE);
+		else if ( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock) )
+			sb.append(PB_FOR);
+		else if ( pb instanceof ParForProgramBlock )
+			sb.append(PB_PARFOR);
+		else if ( pb instanceof IfProgramBlock )
+			sb.append(PB_IF);
+		else if ( pb instanceof FunctionProgramBlock && !(pb instanceof ExternalFunctionProgramBlock) )
+			sb.append(PB_FC);
+		else if ( pb instanceof ExternalFunctionProgramBlock )
+			sb.append(PB_EFC);
+		else //all generic program blocks
+			sb.append(PB_BEGIN);
+		
+		//handle body
+		if( pb instanceof WhileProgramBlock )
+		{
+			WhileProgramBlock wpb = (WhileProgramBlock) pb;
+			sb.append(INST_BEGIN);
+			sb.append( serializeInstructions( wpb.getPredicate(), clsMap ) );
+			sb.append(INST_END);
+			sb.append( COMPONENTS_DELIM );
+			sb.append(INST_BEGIN);
+			sb.append( serializeInstructions( wpb.getExitInstructions(), clsMap ) );
+			sb.append(INST_END);
+			sb.append( COMPONENTS_DELIM );
+			sb.append(PBS_BEGIN);
+			sb.append( rSerializeProgramBlocks( wpb.getChildBlocks(), clsMap) );
+			sb.append(PBS_END);
+		}
+		else if ( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock ) )
+		{
+			ForProgramBlock fpb = (ForProgramBlock) pb; 
+			sb.append( fpb.getIterVar() );
+			sb.append( COMPONENTS_DELIM );
+			sb.append(INST_BEGIN);
+			sb.append( serializeInstructions( fpb.getFromInstructions(), clsMap ) );
+			sb.append(INST_END);
+			sb.append( COMPONENTS_DELIM );
+			sb.append(INST_BEGIN);
+			sb.append( serializeInstructions(fpb.getToInstructions(), clsMap) );
+			sb.append(INST_END);
+			sb.append( COMPONENTS_DELIM );
+			sb.append(INST_BEGIN);
+			sb.append( serializeInstructions(fpb.getIncrementInstructions(), clsMap) );
+			sb.append(INST_END);
+			sb.append( COMPONENTS_DELIM );
+			sb.append(INST_BEGIN);
+			sb.append( serializeInstructions(fpb.getExitInstructions(), clsMap) );
+			sb.append(INST_END);
+			sb.append( COMPONENTS_DELIM );
+			sb.append(PBS_BEGIN);
+			sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) );
+			sb.append(PBS_END);
+		}
+		else if ( pb instanceof ParForProgramBlock )
+		{	
+			ParForProgramBlock pfpb = (ParForProgramBlock) pb; 
+			
+			//check for nested remote ParFOR
+			if( PExecMode.valueOf( pfpb.getParForParams().get( ParForStatementBlock.EXEC_MODE )) == PExecMode.REMOTE_MR )
+				throw new DMLRuntimeException( NOT_SUPPORTED_MR_PARFOR );
+			
+			sb.append( pfpb.getIterVar() );
+			sb.append( COMPONENTS_DELIM );
+			sb.append( serializeResultVariables( pfpb.getResultVariables()) );
+			sb.append( COMPONENTS_DELIM );
+			sb.append( serializeStringHashMap( pfpb.getParForParams()) ); //parameters of nested parfor
+			sb.append( COMPONENTS_DELIM );
+			sb.append(INST_BEGIN);
+			sb.append( serializeInstructions(pfpb.getFromInstructions(), clsMap) );
+			sb.append(INST_END);
+			sb.append( COMPONENTS_DELIM );
+			sb.append(INST_BEGIN);
+			sb.append( serializeInstructions(pfpb.getToInstructions(), clsMap) );
+			sb.append(INST_END);
+			sb.append( COMPONENTS_DELIM );
+			sb.append(INST_BEGIN);
+			sb.append( serializeInstructions(pfpb.getIncrementInstructions(), clsMap) );
+			sb.append(INST_END);
+			sb.append( COMPONENTS_DELIM );	
+			sb.append(INST_BEGIN);
+			sb.append( serializeInstructions(pfpb.getExitInstructions(), clsMap) );
+			sb.append(INST_END);
+			sb.append( COMPONENTS_DELIM );
+			sb.append(PBS_BEGIN);
+			sb.append( rSerializeProgramBlocks( pfpb.getChildBlocks(), clsMap ) );
+			sb.append(PBS_END);
+		}				
+		else if ( pb instanceof IfProgramBlock )
+		{
+			IfProgramBlock ipb = (IfProgramBlock) pb;
+			sb.append(INST_BEGIN);
+			sb.append( serializeInstructions(ipb.getPredicate(), clsMap) );
+			sb.append(INST_END);
+			sb.append( COMPONENTS_DELIM );
+			sb.append(INST_BEGIN);
+			sb.append( serializeInstructions(ipb.getExitInstructions(), clsMap) );
+			sb.append(INST_END);
+			sb.append( COMPONENTS_DELIM );
+			sb.append(PBS_BEGIN);
+			sb.append( rSerializeProgramBlocks(ipb.getChildBlocksIfBody(), clsMap) );
+			sb.append(PBS_END);
+			sb.append( COMPONENTS_DELIM );
+			sb.append(PBS_BEGIN);
+			sb.append( rSerializeProgramBlocks(ipb.getChildBlocksElseBody(), clsMap) );
+			sb.append(PBS_END);
+		}
+		else if( pb instanceof FunctionProgramBlock && !(pb instanceof ExternalFunctionProgramBlock) )
+		{
+			FunctionProgramBlock fpb = (FunctionProgramBlock) pb;
+			
+			sb.append( serializeDataIdentifiers( fpb.getInputParams() ) );
+			sb.append( COMPONENTS_DELIM );
+			sb.append( serializeDataIdentifiers( fpb.getOutputParams() ) );
+			sb.append( COMPONENTS_DELIM );
+			sb.append(INST_BEGIN);
+			sb.append( serializeInstructions(fpb.getInstructions(), clsMap) );
+			sb.append(INST_END);
+			sb.append( COMPONENTS_DELIM );
+			sb.append(PBS_BEGIN);
+			sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) );
+			sb.append(PBS_END);
+			sb.append( COMPONENTS_DELIM );
+		}
+		else if( pb instanceof ExternalFunctionProgramBlock )
+		{
+			if( !(pb instanceof ExternalFunctionProgramBlockCP) ) 
+			{
+				throw new DMLRuntimeException( NOT_SUPPORTED_EXTERNALFUNCTION_PB );
+			}
+			
+			ExternalFunctionProgramBlockCP fpb = (ExternalFunctionProgramBlockCP) pb;
+			
+			sb.append( serializeDataIdentifiers( fpb.getInputParams() ) );
+			sb.append( COMPONENTS_DELIM );
+			sb.append( serializeDataIdentifiers( fpb.getOutputParams() ) );
+			sb.append( COMPONENTS_DELIM );
+			sb.append( serializeStringHashMap( fpb.getOtherParams() ) );
+			sb.append( COMPONENTS_DELIM );
+			sb.append( fpb.getBaseDir() );
+			sb.append( COMPONENTS_DELIM );
+			
+			sb.append(INST_BEGIN);
+			//create on construction anyway 
+			sb.append(INST_END);
+			sb.append( COMPONENTS_DELIM );
+			sb.append(PBS_BEGIN);
+			sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) );
+			sb.append(PBS_END);
+		}
+		else //all generic program blocks
+		{
+			sb.append(INST_BEGIN);
+			sb.append( serializeInstructions(pb.getInstructions(), clsMap) );
+			sb.append(INST_END);
+		}
+		
+		
+		//handle end
+		sb.append(PB_END);
+		
+		return sb.toString();
+	}
+
+	
+	////////////////////////////////
+	// PARSING 
+	////////////////////////////////
+
+	public static SparkPSBody parseSparkPSBody(String in, int id) {
+		SparkPSBody body = new SparkPSBody();
+
+		//header elimination
+		String tmpin = in.replaceAll(NEWLINE, ""); //normalization
+		tmpin = tmpin.substring(PSBODY_BEGIN.length(), tmpin.length() - PSBODY_END.length()); //remove start/end
+		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(tmpin, COMPONENTS_DELIM);
+
+		//handle DMLScript UUID (NOTE: set directly in DMLScript)
+		//(master UUID is used for all nodes (in order to simply cleanup))
+		DMLScript.setUUID(st.nextToken());
+
+		//handle DML config (NOTE: set directly in ConfigurationManager)
+		handleDMLConfig(st.nextToken());
+
+		//handle additional configs
+		parseAndSetAdditionalConfigurations(st.nextToken());
+
+		//handle program
+		Program prog = parseProgram(st.nextToken(), id);
+
+		//handle execution context
+		ExecutionContext ec = parseExecutionContext(st.nextToken(), prog);
+		ec.setProgram(prog);
+
+		//handle program blocks
+		String spbs = st.nextToken();
+		ArrayList<ProgramBlock> pbs = rParseProgramBlocks(spbs, prog, id);
+		prog.getProgramBlocks().addAll(pbs);
+
+		body.setEc(ec);
+		return body;
+	}
+
+	public static ParForBody parseParForBody( String in, int id ) {
+		return parseParForBody(in, id, false);
+	}
+	
+	public static ParForBody parseParForBody( String in, int id, boolean inSpark ) {
+		ParForBody body = new ParForBody();
+		
+		//header elimination
+		String tmpin = in.replaceAll(NEWLINE, ""); //normalization
+		tmpin = tmpin.substring(PARFORBODY_BEGIN.length(),tmpin.length()-PARFORBODY_END.length()); //remove start/end
+		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(tmpin, COMPONENTS_DELIM);
+		
+		//handle DMLScript UUID (NOTE: set directly in DMLScript)
+		//(master UUID is used for all nodes (in order to simply cleanup))
+		DMLScript.setUUID( st.nextToken() );
+		
+		//handle DML config (NOTE: set directly in ConfigurationManager)
+		String confStr = st.nextToken();
+		JobConf job = ConfigurationManager.getCachedJobConf();
+		if( !InfrastructureAnalyzer.isLocalMode(job) ) {
+			handleDMLConfig(confStr);
+			//init internal configuration w/ parsed or default config
+			ParForProgramBlock.initInternalConfigurations(
+					ConfigurationManager.getDMLConfig());
+		}
+		
+		//handle additional configs
+		String aconfs = st.nextToken();
+		if( !inSpark )
+			parseAndSetAdditionalConfigurations( aconfs );
+		
+		//handle program
+		String progStr = st.nextToken();
+		Program prog = parseProgram( progStr, id ); 
+		
+		//handle result variable names
+		String rvarStr = st.nextToken();
+		ArrayList<ResultVar> rvars = parseResultVariables(rvarStr);
+		body.setResultVariables(rvars);
+		
+		//handle execution context
+		String ecStr = st.nextToken();
+		ExecutionContext ec = parseExecutionContext( ecStr, prog );
+		
+		//handle program blocks
+		String spbs = st.nextToken();
+		ArrayList<ProgramBlock> pbs = rParseProgramBlocks(spbs, prog, id);
+		
+		body.setChildBlocks( pbs );
+		body.setEc( ec );
+		
+		return body;
+	}
+
+	private static void handleDMLConfig(String confStr) {
+		if(confStr != null && !confStr.trim().isEmpty()) {
+			DMLConfig dmlconf = DMLConfig.parseDMLConfig(confStr);
+			CompilerConfig cconf = OptimizerUtils.constructCompilerConfig(dmlconf);
+			ConfigurationManager.setLocalConfig(dmlconf);
+			ConfigurationManager.setLocalConfig(cconf);
+		}
+	}
+
+	public static Program parseProgram( String in, int id ) {
+		String lin = in.substring( PROG_BEGIN.length(),in.length()- PROG_END.length()).trim();
+		Program prog = new Program();
+		HashMap<String,FunctionProgramBlock> fc = parseFunctionProgramBlocks(lin, prog, id);
+		for( Entry<String,FunctionProgramBlock> e : fc.entrySet() ) {
+			String[] keypart = e.getKey().split( Program.KEY_DELIM );
+			String namespace = keypart[0];
+			String name      = keypart[1];
+			prog.addFunctionProgramBlock(namespace, name, e.getValue());
+		}
+		return prog;
+	}
+
+	private static LocalVariableMap parseVariables(String in) {
+		LocalVariableMap ret = null;
+		if( in.length()> VARS_BEGIN.length() + VARS_END.length()) {
+			String varStr = in.substring( VARS_BEGIN.length(),in.length()- VARS_END.length()).trim();
+			ret = LocalVariableMap.deserialize(varStr);
+		}
+		else { //empty input symbol table
+			ret = new LocalVariableMap();
+		}
+		return ret;
+	}
+
+	private static HashMap<String,FunctionProgramBlock> parseFunctionProgramBlocks( String in, Program prog, int id ) {
+		HashMap<String,FunctionProgramBlock> ret = new HashMap<>();
+		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer( in, ELEMENT_DELIM );
+		while( st.hasMoreTokens() ) {
+			String lvar  = st.nextToken(); //with ID = CP_CHILD_THREAD+id for current use
+			//put first copy into prog (for direct use)
+			int index = lvar.indexOf( KEY_VALUE_DELIM );
+			String tmp1 = lvar.substring(0, index); // + CP_CHILD_THREAD+id;
+			String tmp2 = lvar.substring(index + 1);
+			ret.put(tmp1, (FunctionProgramBlock)rParseProgramBlock(tmp2, prog, id));
+		}
+		return ret;
+	}
+
+	private static ArrayList<ProgramBlock> rParseProgramBlocks(String in, Program prog, int id) {
+		ArrayList<ProgramBlock> pbs = new ArrayList<>();
+		String tmpdata = in.substring(PBS_BEGIN.length(),in.length()- PBS_END.length());
+		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(tmpdata, ELEMENT_DELIM);
+		while( st.hasMoreTokens() )
+			pbs.add( rParseProgramBlock( st.nextToken(), prog, id ) );
+		return pbs;
+	}
+
+	private static ProgramBlock rParseProgramBlock( String in, Program prog, int id ) {
+		ProgramBlock pb = null;
+		if( in.startsWith(PB_WHILE) )
+			pb = rParseWhileProgramBlock( in, prog, id );
+		else if ( in.startsWith(PB_FOR) )
+			pb = rParseForProgramBlock( in, prog, id );
+		else if ( in.startsWith(PB_PARFOR) )
+			pb = rParseParForProgramBlock( in, prog, id );
+		else if ( in.startsWith(PB_IF) )
+			pb = rParseIfProgramBlock( in, prog, id );
+		else if ( in.startsWith(PB_FC) )
+			pb = rParseFunctionProgramBlock( in, prog, id );
+		else if ( in.startsWith(PB_EFC) )
+			pb = rParseExternalFunctionProgramBlock( in, prog, id );
+ 		else if ( in.startsWith(PB_BEGIN) )
+			pb = rParseGenericProgramBlock( in, prog, id );
+		else 
+			throw new DMLRuntimeException( NOT_SUPPORTED_PB+" "+in );
+		return pb;
+	}
+
+	private static WhileProgramBlock rParseWhileProgramBlock( String in, Program prog, int id ) {
+		String lin = in.substring( PB_WHILE.length(),in.length()- PB_END.length());
+		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
+		
+		//predicate instructions
+		ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id);
+		
+		//exit instructions
+		ArrayList<Instruction> exit = parseInstructions(st.nextToken(),id);
+		
+		//program blocks
+		ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id);
+		
+		WhileProgramBlock wpb = new WhileProgramBlock(prog,inst);
+		wpb.setExitInstructions2(exit);
+		wpb.setChildBlocks(pbs);
+		
+		return wpb;
+	}
+
+	private static ForProgramBlock rParseForProgramBlock( String in, Program prog, int id ) {
+		String lin = in.substring( PB_FOR.length(),in.length()- PB_END.length());
+		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
+		
+		//inputs
+		String iterVar = st.nextToken();
+		
+		//instructions
+		ArrayList<Instruction> from = parseInstructions(st.nextToken(),id);
+		ArrayList<Instruction> to = parseInstructions(st.nextToken(),id);
+		ArrayList<Instruction> incr = parseInstructions(st.nextToken(),id);
+		
+		//exit instructions
+		ArrayList<Instruction> exit = parseInstructions(st.nextToken(),id);
+		
+		//program blocks
+		ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id);
+		
+		ForProgramBlock fpb = new ForProgramBlock(prog, iterVar);
+		fpb.setFromInstructions(from);
+		fpb.setToInstructions(to);
+		fpb.setIncrementInstructions(incr);
+		fpb.setExitInstructions(exit);
+		fpb.setChildBlocks(pbs);
+		
+		return fpb;
+	}
+
+	private static ParForProgramBlock rParseParForProgramBlock( String in, Program prog, int id ) {
+		String lin = in.substring( PB_PARFOR.length(),in.length()- PB_END.length());
+		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
+		
+		//inputs
+		String iterVar = st.nextToken();
+		ArrayList<ResultVar> resultVars = parseResultVariables(st.nextToken());
+		HashMap<String,String> params = parseStringHashMap(st.nextToken());
+		
+		//instructions 
+		ArrayList<Instruction> from = parseInstructions(st.nextToken(), 0);
+		ArrayList<Instruction> to = parseInstructions(st.nextToken(), 0);
+		ArrayList<Instruction> incr = parseInstructions(st.nextToken(), 0);
+		
+		//exit instructions
+		ArrayList<Instruction> exit = parseInstructions(st.nextToken(), 0);
+		
+		//program blocks //reset id to preinit state, replaced during exec
+		ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, 0); 
+		
+		ParForProgramBlock pfpb = new ParForProgramBlock(id, prog, iterVar, params, resultVars);
+		pfpb.disableOptimization(); //already done in top-level parfor
+		pfpb.setFromInstructions(from);
+		pfpb.setToInstructions(to);
+		pfpb.setIncrementInstructions(incr);
+		pfpb.setExitInstructions(exit);
+		pfpb.setChildBlocks(pbs);
+		
+		return pfpb;
+	}
+
+	private static IfProgramBlock rParseIfProgramBlock( String in, Program prog, int id ) {
+		String lin = in.substring( PB_IF.length(),in.length()- PB_END.length());
+		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
+		
+		//predicate instructions
+		ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id);
+		
+		//exit instructions
+		ArrayList<Instruction> exit = parseInstructions(st.nextToken(),id);
+		
+		//program blocks: if and else
+		ArrayList<ProgramBlock> pbs1 = rParseProgramBlocks(st.nextToken(), prog, id);
+		ArrayList<ProgramBlock> pbs2 = rParseProgramBlocks(st.nextToken(), prog, id);
+		
+		IfProgramBlock ipb = new IfProgramBlock(prog,inst);
+		ipb.setExitInstructions2(exit);
+		ipb.setChildBlocksIfBody(pbs1);
+		ipb.setChildBlocksElseBody(pbs2);
+		
+		return ipb;
+	}
+
+	private static FunctionProgramBlock rParseFunctionProgramBlock( String in, Program prog, int id ) {
+		String lin = in.substring( PB_FC.length(),in.length()- PB_END.length());
+		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
+		
+		//inputs and outputs
+		ArrayList<DataIdentifier> dat1 = parseDataIdentifiers(st.nextToken());
+		ArrayList<DataIdentifier> dat2 = parseDataIdentifiers(st.nextToken());
+		
+		//instructions
+		ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id);
+
+		//program blocks
+		ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id);
+
+		ArrayList<DataIdentifier> tmp1 = new ArrayList<>(dat1);
+		ArrayList<DataIdentifier> tmp2 = new ArrayList<>(dat2);
+		FunctionProgramBlock fpb = new FunctionProgramBlock(prog, tmp1, tmp2);
+		fpb.setInstructions(inst);
+		fpb.setChildBlocks(pbs);
+		
+		return fpb;
+	}
+
+	private static ExternalFunctionProgramBlock rParseExternalFunctionProgramBlock( String in, Program prog, int id ) {
+		String lin = in.substring( PB_EFC.length(),in.length()- PB_END.length());
+		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
+		
+		//inputs, outputs and params
+		ArrayList<DataIdentifier> dat1 = parseDataIdentifiers(st.nextToken());
+		ArrayList<DataIdentifier> dat2 = parseDataIdentifiers(st.nextToken());
+		HashMap<String,String> dat3 = parseStringHashMap(st.nextToken());
+
+		//basedir
+		String basedir = st.nextToken();
+		
+		//instructions (required for removing INST BEGIN, END)
+		parseInstructions(st.nextToken(),id);
+
+		//program blocks
+		ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id);
+
+		ArrayList<DataIdentifier> tmp1 = new ArrayList<>(dat1);
+		ArrayList<DataIdentifier> tmp2 = new ArrayList<>(dat2);
+		
+		//only CP external functions, because no nested MR jobs for reblocks
+		ExternalFunctionProgramBlockCP efpb = new ExternalFunctionProgramBlockCP(prog, tmp1, tmp2, dat3, basedir);
+		efpb.setChildBlocks(pbs);
+		
+		return efpb;
+	}
+
+	private static ProgramBlock rParseGenericProgramBlock( String in, Program prog, int id ) {
+		String lin = in.substring( PB_BEGIN.length(),in.length()- PB_END.length());
+		StringTokenizer st = new StringTokenizer(lin,COMPONENTS_DELIM);
+		ProgramBlock pb = new ProgramBlock(prog);
+		pb.setInstructions(parseInstructions(st.nextToken(),id));
+		return pb;
+	}
+
+	private static ArrayList<Instruction> parseInstructions( String in, int id ) {
+		ArrayList<Instruction> insts = new ArrayList<>();
+		String lin = in.substring( INST_BEGIN.length(),in.length()- INST_END.length());
+		StringTokenizer st = new StringTokenizer(lin, ELEMENT_DELIM);
+		while(st.hasMoreTokens()) {
+			//Note that at this point only CP instructions and External function instruction can occur
+			String instStr = st.nextToken();
+			try {
+				Instruction tmpinst = CPInstructionParser.parseSingleInstruction(instStr);
+				tmpinst = saveReplaceThreadID(tmpinst, Lop.CP_ROOT_THREAD_ID, Lop.CP_CHILD_THREAD+id );
+				insts.add( tmpinst );
+			}
+			catch(Exception ex) {
+				throw new DMLRuntimeException("Failed to parse instruction: " + instStr, ex);
+			}
+		}
+		return insts;
+	}
+	
+	private static ArrayList<ResultVar> parseResultVariables(String in) {
+		ArrayList<ResultVar> ret = new ArrayList<>();
+		for(String var : parseStringArrayList(in)) {
+			boolean accum = var.endsWith("+");
+			ret.add(new ResultVar(accum ? var.substring(0, var.length()-1) : var, accum));
+		}
+		return ret;
+	}
+
+	private static HashMap<String,String> parseStringHashMap( String in ) {
+		HashMap<String,String> vars = new HashMap<>();
+		StringTokenizer st = new StringTokenizer(in,ELEMENT_DELIM);
+		while( st.hasMoreTokens() ) {
+			String lin = st.nextToken();
+			int index = lin.indexOf( KEY_VALUE_DELIM );
+			String tmp1 = lin.substring(0, index);
+			String tmp2 = lin.substring(index + 1);
+			vars.put(tmp1, tmp2);
+		}
+		return vars;
+	}
+	
+	private static ArrayList<String> parseStringArrayList(String in) {
+		return parseStringArrayList(in, ELEMENT_DELIM);
+	}
+
+	private static ArrayList<String> parseStringArrayList(String in, String delim) {
+		StringTokenizer st = new StringTokenizer(in, delim);
+		ArrayList<String> vars = new ArrayList<>(st.countTokens());
+		while( st.hasMoreTokens() )
+			vars.add(st.nextToken());
+		return vars;
+	}
+
+	private static ArrayList<DataIdentifier> parseDataIdentifiers( String in ) {
+		ArrayList<DataIdentifier> vars = new ArrayList<>();
+		StringTokenizer st = new StringTokenizer(in, ELEMENT_DELIM);
+		while( st.hasMoreTokens() ) {
+			String tmp = st.nextToken();
+			DataIdentifier dat = parseDataIdentifier( tmp );
+			vars.add(dat);
+		}
+		return vars;
+	}
+
+	private static DataIdentifier parseDataIdentifier( String in ) {
+		StringTokenizer st = new StringTokenizer(in, DATA_FIELD_DELIM);
+		DataIdentifier dat = new DataIdentifier(st.nextToken());
+		dat.setDataType(DataType.valueOf(st.nextToken()));
+		dat.setValueType(ValueType.valueOf(st.nextToken()));
+		return dat;
+	}
+	
+	/**
+	 * NOTE: MRJobConfiguration cannot be used for the general case because program blocks and
+	 * related symbol tables can be hierarchically structured.
+	 * 
+	 * @param in data object as string
+	 * @return array of objects
+	 */
+	public static Object[] parseDataObject(String in) {
+		Object[] ret = new Object[2];
+	
+		StringTokenizer st = new StringTokenizer(in, DATA_FIELD_DELIM );
+		String name = st.nextToken();
+		DataType datatype = DataType.valueOf( st.nextToken() );
+		ValueType valuetype = ValueType.valueOf( st.nextToken() );
+		String valString = st.hasMoreTokens() ? st.nextToken() : "";
+		Data dat = null;
+		switch( datatype )
+		{
+			case SCALAR: {
+				switch ( valuetype ) {
+					case INT:     dat = new IntObject(Long.parseLong(valString)); break;
+					case DOUBLE:  dat = new DoubleObject(Double.parseDouble(valString)); break;
+					case BOOLEAN: dat = new BooleanObject(Boolean.parseBoolean(valString)); break;
+					case STRING:  dat = new StringObject(valString); break;
+					default:
+						throw new DMLRuntimeException("Unable to parse valuetype "+valuetype);
+				}
+				break;
+			}
+			case MATRIX: {
+				MatrixObject mo = new MatrixObject(valuetype,valString);
+				long rows = Long.parseLong( st.nextToken() );
+				long cols = Long.parseLong( st.nextToken() );
+				int brows = Integer.parseInt( st.nextToken() );
+				int bcols = Integer.parseInt( st.nextToken() );
+				long nnz = Long.parseLong( st.nextToken() );
+				InputInfo iin = InputInfo.stringToInputInfo( st.nextToken() );
+				OutputInfo oin = OutputInfo.stringToOutputInfo( st.nextToken() );
+				PartitionFormat partFormat = PartitionFormat.valueOf( st.nextToken() );
+				UpdateType inplace = UpdateType.valueOf( st.nextToken() );
+				MatrixCharacteristics mc = new MatrixCharacteristics(rows, cols, brows, bcols, nnz); 
+				MetaDataFormat md = new MetaDataFormat( mc, oin, iin );
+				mo.setMetaData( md );
+				if( partFormat._dpf != PDataPartitionFormat.NONE )
+					mo.setPartitioned( partFormat._dpf, partFormat._N );
+				mo.setUpdateType(inplace);
+				dat = mo;
+				break;
+			}
+			case LIST:
+				int size = Integer.parseInt(st.nextToken());
+				String namesStr = st.nextToken();
+				List<String> names = namesStr.equals(EMPTY) ? null :
+					parseStringArrayList(namesStr, ELEMENT_DELIM2);
+				List<Data> data = new ArrayList<>(size);
+				st.nextToken(LIST_ELEMENT_DELIM);
+				for (int i = 0; i < size; i++) {
+					String dataStr = st.nextToken();
+					Object[] obj = parseDataObject(dataStr);
+					data.add((Data) obj[1]);
+				}
+				dat = new ListObject(data, names);
+				break;
+			default:
+				throw new DMLRuntimeException("Unable to parse datatype "+datatype);
+		}
+		
+		ret[0] = name;
+		ret[1] = dat;
+		return ret;
+	}
+
+	private static ExecutionContext parseExecutionContext(String in, Program prog) {
+		ExecutionContext ec = null;
+		String lin = in.substring(EC_BEGIN.length(),in.length()- EC_END.length()).trim();
+		if( !lin.equals( EMPTY ) ) {
+			LocalVariableMap vars = parseVariables(lin);
+			ec = ExecutionContextFactory.createContext( false, prog );
+			ec.setVariables(vars);
+		}
+		return ec;
+	}
+	
+	private static void parseAndSetAdditionalConfigurations(String conf) {
+		String[] statsFlag = conf.split("=");
+		DMLScript.STATISTICS = Boolean.parseBoolean(statsFlag[1]);
+	}
+
+	//////////
+	// CUSTOM SAFE LITERAL REPLACEMENT
+	
+	/**
+	 * In-place replacement of thread ids in filenames, functions names etc
+	 * 
+	 * @param inst instruction
+	 * @param pattern ?
+	 * @param replacement string replacement
+	 * @return instruction
+	 */
+	private static Instruction saveReplaceThreadID( Instruction inst, String pattern, String replacement ) {
+		//currently known, relevant instructions: createvar, rand, seq, extfunct, 
+		if( inst instanceof MRJobInstruction ) {
+			//update dims file, and internal string representations of rand/seq instructions
+			MRJobInstruction mrinst = (MRJobInstruction)inst;
+			mrinst.updateInstructionThreadID(pattern, replacement);
+		}
+		else if ( inst instanceof VariableCPInstruction ) { //createvar, setfilename
+			//update in-memory representation
+			inst.updateInstructionThreadID(pattern, replacement);
+		}
+		//NOTE> //Rand, seq in CP not required
+		return inst;
+	}
+	
+	public static String saveReplaceFilenameThreadID(String fname, String pattern, String replace) {
+		//save replace necessary in order to account for the possibility that read variables have our prefix in the absolute path
+		//replace the last match only, because (1) we have at most one _t0 and (2) always concatenated to the end.
+		int pos = fname.lastIndexOf(pattern);
+		return ( pos < 0 ) ? fname : fname.substring(0, pos)
+			+ replace + fname.substring(pos+pattern.length());
+	}
+	
+	
+	//////////
+	// CUSTOM HIERARCHICAL TOKENIZER
+	
+	
+	/**
+	 * Custom StringTokenizer for splitting strings of hierarchies. The basic idea is to
+	 * search for delim-Strings on the same hierarchy level, while delims of lower hierarchy
+	 * levels are skipped.  
+	 * 
+	 */
+	private static class HierarchyAwareStringTokenizer //extends StringTokenizer
+	{
+		private String _str = null;
+		private String _del = null;
+		private int    _off = -1;
+		
+		public HierarchyAwareStringTokenizer( String in, String delim ) {
+			//super(in);
+			_str = in;
+			_del = delim;
+			_off = delim.length();
+		}
+
+		public boolean hasMoreTokens() {
+			return (_str.length() > 0);
+		}
+
+		public String nextToken() {
+			int nextDelim = determineNextSameLevelIndexOf(_str, _del);
+			String token = null;
+			if(nextDelim < 0) {
+				nextDelim = _str.length();
+				_off = 0;
+			}
+			token = _str.substring(0,nextDelim);
+			_str = _str.substring( nextDelim + _off );
+			return token;
+		}
+		
+		private static int determineNextSameLevelIndexOf( String data, String pattern  )
+		{
+			String tmpdata = data;
+			int index      = 0;
+			int count      = 0;
+			int off=0,i1,i2,i3,min;
+			
+			while(true) {
+				i1 = tmpdata.indexOf(pattern);
+				i2 = tmpdata.indexOf(LEVELIN);
+				i3 = tmpdata.indexOf(LEVELOUT);
+				
+				if( i1 < 0 ) return i1; //no pattern found at all
+				
+				min = i1; //min >= 0 by definition
+				if( i2 >= 0 ) min = Math.min(min, i2);
+				if( i3 >= 0 ) min = Math.min(min, i3);
+				
+				//stack maintenance
+				if( i1 == min && count == 0 )
+					return index+i1;
+				else if( i2 == min ) {
+					count++;
+					off = LEVELIN.length();
+				}
+				else if( i3 == min ) {
+					count--;
+					off = LEVELOUT.length();
+				}
+			
+				//prune investigated string
+				index += min+off;
+				tmpdata = tmpdata.substring(min+off);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservLocalNNTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservLocalNNTest.java b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservLocalNNTest.java
new file mode 100644
index 0000000..d5fd509
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservLocalNNTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.paramserv;
+
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.junit.Test;
+
+public class ParamservLocalNNTest extends AutomatedTestBase {
+
+	private static final String TEST_NAME1 = "paramserv-nn-bsp-batch-dc";
+	private static final String TEST_NAME2 = "paramserv-nn-asp-batch";
+	private static final String TEST_NAME3 = "paramserv-nn-bsp-epoch";
+	private static final String TEST_NAME4 = "paramserv-nn-asp-epoch";
+	private static final String TEST_NAME5 = "paramserv-nn-bsp-batch-drr";
+	private static final String TEST_NAME6 = "paramserv-nn-bsp-batch-dr";
+	private static final String TEST_NAME7 = "paramserv-nn-bsp-batch-or";
+
+	private static final String TEST_DIR = "functions/paramserv/";
+	private static final String TEST_CLASS_DIR = TEST_DIR + ParamservLocalNNTest.class.getSimpleName() + "/";
+
+	@Override
+	public void setUp() {
+		addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {}));
+		addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {}));
+		addTestConfiguration(TEST_NAME3, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] {}));
+		addTestConfiguration(TEST_NAME4, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME4, new String[] {}));
+		addTestConfiguration(TEST_NAME5, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME5, new String[] {}));
+		addTestConfiguration(TEST_NAME6, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME6, new String[] {}));
+		addTestConfiguration(TEST_NAME7, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME7, new String[] {}));
+	}
+
+	@Test
+	public void testParamservBSPBatchDisjointContiguous() {
+		runDMLTest(TEST_NAME1);
+	}
+
+	@Test
+	public void testParamservASPBatch() {
+		runDMLTest(TEST_NAME2);
+	}
+
+	@Test
+	public void testParamservBSPEpoch() {
+		runDMLTest(TEST_NAME3);
+	}
+
+	@Test
+	public void testParamservASPEpoch() {
+		runDMLTest(TEST_NAME4);
+	}
+
+	@Test
+	public void testParamservBSPBatchDisjointRoundRobin() {
+		runDMLTest(TEST_NAME5);
+	}
+
+	@Test
+	public void testParamservBSPBatchDisjointRandom() {
+		runDMLTest(TEST_NAME6);
+	}
+
+	@Test
+	public void testParamservBSPBatchOverlapReshuffle() {
+		runDMLTest(TEST_NAME7);
+	}
+
+	private void runDMLTest(String testname) {
+		TestConfiguration config = getTestConfiguration(testname);
+		loadTestConfiguration(config);
+		programArgs = new String[] { "-explain" };
+		String HOME = SCRIPT_DIR + TEST_DIR;
+		fullDMLScriptName = HOME + testname + ".dml";
+		runTest(true, false, null, null, -1);
+	}
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservNNTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservNNTest.java b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservNNTest.java
deleted file mode 100644
index d7afc9d..0000000
--- a/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservNNTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.test.integration.functions.paramserv;
-
-import org.apache.sysml.test.integration.AutomatedTestBase;
-import org.apache.sysml.test.integration.TestConfiguration;
-import org.junit.Test;
-
-public class ParamservNNTest extends AutomatedTestBase {
-
-	private static final String TEST_NAME1 = "paramserv-nn-bsp-batch-dc";
-	private static final String TEST_NAME2 = "paramserv-nn-asp-batch";
-	private static final String TEST_NAME3 = "paramserv-nn-bsp-epoch";
-	private static final String TEST_NAME4 = "paramserv-nn-asp-epoch";
-	private static final String TEST_NAME5 = "paramserv-nn-bsp-batch-drr";
-	private static final String TEST_NAME6 = "paramserv-nn-bsp-batch-dr";
-	private static final String TEST_NAME7 = "paramserv-nn-bsp-batch-or";
-
-	private static final String TEST_DIR = "functions/paramserv/";
-	private static final String TEST_CLASS_DIR = TEST_DIR + ParamservNNTest.class.getSimpleName() + "/";
-
-	private final String HOME = SCRIPT_DIR + TEST_DIR;
-
-	@Override
-	public void setUp() {
-		addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {}));
-		addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {}));
-		addTestConfiguration(TEST_NAME3, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] {}));
-		addTestConfiguration(TEST_NAME4, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME4, new String[] {}));
-		addTestConfiguration(TEST_NAME5, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME5, new String[] {}));
-		addTestConfiguration(TEST_NAME6, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME6, new String[] {}));
-		addTestConfiguration(TEST_NAME7, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME7, new String[] {}));
-	}
-
-	@Test
-	public void testParamservBSPBatchDisjointContiguous() {
-		runDMLTest(TEST_NAME1);
-	}
-
-	@Test
-	public void testParamservASPBatch() {
-		runDMLTest(TEST_NAME2);
-	}
-
-	@Test
-	public void testParamservBSPEpoch() {
-		runDMLTest(TEST_NAME3);
-	}
-
-	@Test
-	public void testParamservASPEpoch() {
-		runDMLTest(TEST_NAME4);
-	}
-
-	@Test
-	public void testParamservBSPBatchDisjointRoundRobin() {
-		runDMLTest(TEST_NAME5);
-	}
-
-	@Test
-	public void testParamservBSPBatchDisjointRandom() {
-		runDMLTest(TEST_NAME6);
-	}
-
-	@Test
-	public void testParamservBSPBatchOverlapReshuffle() {
-		runDMLTest(TEST_NAME7);
-	}
-
-	private void runDMLTest(String testname) {
-		TestConfiguration config = getTestConfiguration(testname);
-		loadTestConfiguration(config);
-		programArgs = new String[] { "-explain" };
-		fullDMLScriptName = HOME + testname + ".dml";
-		runTest(true, false, null, null, -1);
-	}
-}


[3/4] systemml git commit: [SYSTEMML-2419] Paramserv spark function shipping and worker setup

Posted by mb...@apache.org.
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
deleted file mode 100644
index 919b357..0000000
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
+++ /dev/null
@@ -1,1866 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.controlprogram.parfor;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.StringTokenizer;
-import java.util.Map.Entry;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.sysml.api.DMLScript;
-import org.apache.sysml.conf.CompilerConfig.ConfigType;
-import org.apache.sysml.conf.CompilerConfig;
-import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.conf.DMLConfig;
-import org.apache.sysml.hops.Hop;
-import org.apache.sysml.hops.OptimizerUtils;
-import org.apache.sysml.hops.recompile.Recompiler;
-import org.apache.sysml.parser.DMLProgram;
-import org.apache.sysml.parser.DataIdentifier;
-import org.apache.sysml.parser.ForStatementBlock;
-import org.apache.sysml.parser.IfStatementBlock;
-import org.apache.sysml.parser.ParForStatementBlock;
-import org.apache.sysml.parser.StatementBlock;
-import org.apache.sysml.parser.Expression.DataType;
-import org.apache.sysml.parser.Expression.ValueType;
-import org.apache.sysml.parser.ParForStatementBlock.ResultVar;
-import org.apache.sysml.parser.WhileStatementBlock;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.codegen.CodegenUtils;
-import org.apache.sysml.runtime.controlprogram.ExternalFunctionProgramBlock;
-import org.apache.sysml.runtime.controlprogram.ExternalFunctionProgramBlockCP;
-import org.apache.sysml.runtime.controlprogram.ForProgramBlock;
-import org.apache.sysml.runtime.controlprogram.FunctionProgramBlock;
-import org.apache.sysml.runtime.controlprogram.IfProgramBlock;
-import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
-import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
-import org.apache.sysml.runtime.controlprogram.Program;
-import org.apache.sysml.runtime.controlprogram.ProgramBlock;
-import org.apache.sysml.runtime.controlprogram.WhileProgramBlock;
-import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
-import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PExecMode;
-import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
-import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
-import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.instructions.CPInstructionParser;
-import org.apache.sysml.runtime.instructions.Instruction;
-import org.apache.sysml.runtime.instructions.InstructionParser;
-import org.apache.sysml.runtime.instructions.MRJobInstruction;
-import org.apache.sysml.runtime.instructions.cp.BooleanObject;
-import org.apache.sysml.runtime.instructions.cp.CPInstruction;
-import org.apache.sysml.runtime.instructions.cp.Data;
-import org.apache.sysml.runtime.instructions.cp.DoubleObject;
-import org.apache.sysml.runtime.instructions.cp.FunctionCallCPInstruction;
-import org.apache.sysml.runtime.instructions.cp.IntObject;
-import org.apache.sysml.runtime.instructions.cp.ScalarObject;
-import org.apache.sysml.runtime.instructions.cp.SpoofCPInstruction;
-import org.apache.sysml.runtime.instructions.cp.StringObject;
-import org.apache.sysml.runtime.instructions.cp.VariableCPInstruction;
-import org.apache.sysml.runtime.instructions.gpu.GPUInstruction;
-import org.apache.sysml.runtime.instructions.mr.MRInstruction;
-import org.apache.sysml.runtime.instructions.spark.SPInstruction;
-import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
-import org.apache.sysml.runtime.matrix.MetaDataFormat;
-import org.apache.sysml.runtime.matrix.data.InputInfo;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.OutputInfo;
-import org.apache.sysml.udf.ExternalFunctionInvocationInstruction;
-
-/**
- * Program converter functionalities for 
- *   (1) creating deep copies of program blocks, instructions, function program blocks, and 
- *   (2) serializing and parsing of programs, program blocks, functions program blocks.
- * 
- */
-//TODO: rewrite class to instance-based invocation (grown gradually and now inappropriate design)
-public class ProgramConverter 
-{
-	protected static final Log LOG = LogFactory.getLog(ProgramConverter.class.getName());
-    
-	//use escaped unicodes for separators in order to prevent string conflict
-	public static final String NEWLINE           = "\n"; //System.lineSeparator();
-	public static final String COMPONENTS_DELIM  = "\u236e"; //semicolon w/ bar; ";";
-	public static final String ELEMENT_DELIM     = "\u236a"; //comma w/ bar; ",";
-	public static final String DATA_FIELD_DELIM  = "\u007c"; //"|";
-	public static final String KEY_VALUE_DELIM   = "\u003d"; //"=";
-	public static final String LEVELIN           = "\u23a8"; //variant of left curly bracket; "\u007b"; //"{";
-	public static final String LEVELOUT          = "\u23ac"; //variant of right curly bracket; "\u007d"; //"}";	
-	public static final String EMPTY             = "null";
-	
-	//public static final String CP_ROOT_THREAD_SEPARATOR = "/";//File.separator;
-	public static final String CP_ROOT_THREAD_ID = "_t0";       
-	public static final String CP_CHILD_THREAD   = "_t";
-		
-	public static final String PARFOR_CDATA_BEGIN = "<![CDATA[";
-	public static final String PARFOR_CDATA_END = " ]]>";
-	
-	public static final String PARFOR_PROG_BEGIN = " PROG" + LEVELIN;
-	public static final String PARFOR_PROG_END   = LEVELOUT;	
-	public static final String PARFORBODY_BEGIN  = PARFOR_CDATA_BEGIN+"PARFORBODY" + LEVELIN;
-	public static final String PARFORBODY_END    = LEVELOUT+PARFOR_CDATA_END;
-	public static final String PARFOR_VARS_BEGIN = "VARS: ";
-	public static final String PARFOR_VARS_END   = "";
-	public static final String PARFOR_PBS_BEGIN  = " PBS" + LEVELIN;
-	public static final String PARFOR_PBS_END    = LEVELOUT;
-	public static final String PARFOR_INST_BEGIN = " INST: ";
-	public static final String PARFOR_INST_END   = "";
-	public static final String PARFOR_EC_BEGIN   = " EC: ";
-	public static final String PARFOR_EC_END     = "";	
-	public static final String PARFOR_PB_BEGIN   = " PB" + LEVELIN;
-	public static final String PARFOR_PB_END     = LEVELOUT;
-	public static final String PARFOR_PB_WHILE   = " WHILE" + LEVELIN;
-	public static final String PARFOR_PB_FOR     = " FOR" + LEVELIN;
-	public static final String PARFOR_PB_PARFOR  = " PARFOR" + LEVELIN;
-	public static final String PARFOR_PB_IF      = " IF" + LEVELIN;
-	public static final String PARFOR_PB_FC      = " FC" + LEVELIN;
-	public static final String PARFOR_PB_EFC     = " EFC" + LEVELIN;
-	
-	public static final String PARFOR_CONF_STATS = "stats";
-	
-	
-	//exception msgs
-	public static final String NOT_SUPPORTED_EXTERNALFUNCTION_PB = "Not supported: ExternalFunctionProgramBlock contains MR instructions. " +
-			                                                       "(ExternalFunctionPRogramBlockCP can be used)";
-	public static final String NOT_SUPPORTED_MR_INSTRUCTION      = "Not supported: Instructions of type other than CP instructions";
-	public static final String NOT_SUPPORTED_MR_PARFOR           = "Not supported: Nested ParFOR REMOTE_MR due to possible deadlocks." +
-			                                                       "(LOCAL can be used for innner ParFOR)";
-	public static final String NOT_SUPPORTED_PB                  = "Not supported: type of program block";
-	
-	////////////////////////////////
-	// CREATION of DEEP COPIES
-	////////////////////////////////
-	
-	/**
-	 * Creates a deep copy of the given execution context.
-	 * For rt_platform=Hadoop, execution context has a symbol table.
-	 * 
-	 * @param ec execution context
-	 * @return execution context
-	 * @throws CloneNotSupportedException if CloneNotSupportedException occurs
-	 */
-	public static ExecutionContext createDeepCopyExecutionContext(ExecutionContext ec) 
-		throws CloneNotSupportedException
-	{
-		ExecutionContext cpec = ExecutionContextFactory.createContext(false, ec.getProgram());
-		cpec.setVariables((LocalVariableMap) ec.getVariables().clone());
-	
-		//handle result variables with in-place update flag
-		//(each worker requires its own copy of the empty matrix object)
-		for( String var : cpec.getVariables().keySet() ) {
-			Data dat = cpec.getVariables().get(var);
-			if( dat instanceof MatrixObject && ((MatrixObject)dat).getUpdateType().isInPlace() ) {
-				MatrixObject mo = (MatrixObject)dat;
-				MatrixObject moNew = new MatrixObject(mo); 
-				if( mo.getNnz() != 0 ){
-					// If output matrix is not empty (NNZ != 0), then local copy is created so that 
-					// update in place operation can be applied.
-					MatrixBlock mbVar = mo.acquireRead();
-					moNew.acquireModify (new MatrixBlock(mbVar));
-					mo.release();
-				} else {
-					//create empty matrix block w/ dense representation (preferred for update in-place)
-					//Creating a dense matrix block is valid because empty block not allocated and transfer 
-					// to sparse representation happens in left indexing in place operation.
-					moNew.acquireModify(new MatrixBlock((int)mo.getNumRows(), (int)mo.getNumColumns(), false));
-				}
-				moNew.release();			
-				cpec.setVariable(var, moNew);
-			}
-		}
-		
-		return cpec;
-	}
-	
-	/**
-	 * This recursively creates a deep copy of program blocks and transparently replaces filenames according to the
-	 * specified parallel worker in order to avoid conflicts between parworkers. This happens recursively in order
-	 * to support arbitrary control-flow constructs within a parfor. 
-	 * 
-	 * @param childBlocks child program blocks
-	 * @param pid ?
-	 * @param IDPrefix ?
-	 * @param fnStack ?
-	 * @param fnCreated ?
-	 * @param plain if true, full deep copy without id replacement
-	 * @param forceDeepCopy if true, force deep copy
-	 * @return list of program blocks
-	 */
-	public static ArrayList<ProgramBlock> rcreateDeepCopyProgramBlocks(ArrayList<ProgramBlock> childBlocks, long pid, int IDPrefix, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) 
-	{
-		ArrayList<ProgramBlock> tmp = new ArrayList<>();
-		
-		for( ProgramBlock pb : childBlocks )
-		{
-			Program prog = pb.getProgram();
-			ProgramBlock tmpPB = null;
-			
-			if( pb instanceof WhileProgramBlock ) 
-			{
-				tmpPB = createDeepCopyWhileProgramBlock((WhileProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy);
-			}
-			else if( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock) )
-			{
-				tmpPB = createDeepCopyForProgramBlock((ForProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy );
-			}
-			else if( pb instanceof ParForProgramBlock )
-			{
-				ParForProgramBlock pfpb = (ParForProgramBlock) pb;
-				if( ParForProgramBlock.ALLOW_NESTED_PARALLELISM )
-					tmpPB = createDeepCopyParForProgramBlock(pfpb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy);
-				else 
-					tmpPB = createDeepCopyForProgramBlock((ForProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy);
-			}				
-			else if( pb instanceof IfProgramBlock )
-			{
-				tmpPB = createDeepCopyIfProgramBlock((IfProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy);
-			}	
-			else //last-level program block
-			{
-				tmpPB = new ProgramBlock(prog); // general case use for most PBs
-				
-				//for recompile in the master node JVM
-				tmpPB.setStatementBlock(createStatementBlockCopy(pb.getStatementBlock(), pid, plain, forceDeepCopy)); 
-				//tmpPB.setStatementBlock(pb.getStatementBlock()); 
-				tmpPB.setThreadID(pid);
-			}
-
-			//copy instructions
-			tmpPB.setInstructions( createDeepCopyInstructionSet(pb.getInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
-			
-			//copy symbol table
-			//tmpPB.setVariables( pb.getVariables() ); //implicit cloning			
-			
-			tmp.add(tmpPB);
-		}
-		
-		return tmp;
-	}
-
-	public static WhileProgramBlock createDeepCopyWhileProgramBlock(WhileProgramBlock wpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) {
-		ArrayList<Instruction> predinst = createDeepCopyInstructionSet(wpb.getPredicate(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true);
-		WhileProgramBlock tmpPB = new WhileProgramBlock(prog, predinst);
-		tmpPB.setStatementBlock( createWhileStatementBlockCopy((WhileStatementBlock) wpb.getStatementBlock(), pid, plain, forceDeepCopy) );
-		tmpPB.setThreadID(pid);
-		
-		tmpPB.setExitInstructions2( createDeepCopyInstructionSet(wpb.getExitInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true));
-		tmpPB.setChildBlocks(rcreateDeepCopyProgramBlocks(wpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy));
-		
-		return tmpPB;
-	}
-
-	public static IfProgramBlock createDeepCopyIfProgramBlock(IfProgramBlock ipb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) {
-		ArrayList<Instruction> predinst = createDeepCopyInstructionSet(ipb.getPredicate(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true);
-		IfProgramBlock tmpPB = new IfProgramBlock(prog, predinst);
-		tmpPB.setStatementBlock( createIfStatementBlockCopy((IfStatementBlock)ipb.getStatementBlock(), pid, plain, forceDeepCopy ) );
-		tmpPB.setThreadID(pid);
-		
-		tmpPB.setExitInstructions2( createDeepCopyInstructionSet(ipb.getExitInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true));
-		tmpPB.setChildBlocksIfBody(rcreateDeepCopyProgramBlocks(ipb.getChildBlocksIfBody(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy));
-		tmpPB.setChildBlocksElseBody(rcreateDeepCopyProgramBlocks(ipb.getChildBlocksElseBody(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy));
-		
-		return tmpPB;
-	}
-
-	public static ForProgramBlock createDeepCopyForProgramBlock(ForProgramBlock fpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) {
-		ForProgramBlock tmpPB = new ForProgramBlock(prog,fpb.getIterVar());
-		tmpPB.setStatementBlock( createForStatementBlockCopy((ForStatementBlock)fpb.getStatementBlock(), pid, plain, forceDeepCopy));
-		tmpPB.setThreadID(pid);
-		
-		tmpPB.setFromInstructions( createDeepCopyInstructionSet(fpb.getFromInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
-		tmpPB.setToInstructions( createDeepCopyInstructionSet(fpb.getToInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
-		tmpPB.setIncrementInstructions( createDeepCopyInstructionSet(fpb.getIncrementInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
-		tmpPB.setExitInstructions( createDeepCopyInstructionSet(fpb.getExitInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
-		tmpPB.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy) );
-		
-		return tmpPB;
-	}
-
-	public static ForProgramBlock createShallowCopyForProgramBlock(ForProgramBlock fpb, Program prog ) {
-		ForProgramBlock tmpPB = new ForProgramBlock(prog,fpb.getIterVar());
-		
-		tmpPB.setFromInstructions( fpb.getFromInstructions() );
-		tmpPB.setToInstructions( fpb.getToInstructions() );
-		tmpPB.setIncrementInstructions( fpb.getIncrementInstructions() );
-		tmpPB.setExitInstructions( fpb.getExitInstructions() );
-		tmpPB.setChildBlocks( fpb.getChildBlocks() );
-		
-		return tmpPB;
-	}
-
-	public static ParForProgramBlock createDeepCopyParForProgramBlock(ParForProgramBlock pfpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) {
-		ParForProgramBlock tmpPB = null;
-		
-		if( IDPrefix == -1 ) //still on master node
-			tmpPB = new ParForProgramBlock(prog,pfpb.getIterVar(), pfpb.getParForParams(), pfpb.getResultVariables());
-		else //child of remote ParWorker at any level
-			tmpPB = new ParForProgramBlock(IDPrefix, prog, pfpb.getIterVar(), pfpb.getParForParams(), pfpb.getResultVariables());
-		
-		tmpPB.setStatementBlock( createForStatementBlockCopy( (ForStatementBlock) pfpb.getStatementBlock(), pid, plain, forceDeepCopy) );
-		tmpPB.setThreadID(pid);
-		
-		tmpPB.disableOptimization(); //already done in top-level parfor
-		tmpPB.disableMonitorReport(); //already done in top-level parfor
-		
-		tmpPB.setFromInstructions( createDeepCopyInstructionSet(pfpb.getFromInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
-		tmpPB.setToInstructions( createDeepCopyInstructionSet(pfpb.getToInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
-		tmpPB.setIncrementInstructions( createDeepCopyInstructionSet(pfpb.getIncrementInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
-		tmpPB.setExitInstructions( createDeepCopyInstructionSet(pfpb.getExitInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
-
-		//NOTE: Normally, no recursive copy because (1) copied on each execution in this PB anyway 
-		//and (2) leave placeholders as they are. However, if plain, an explicit deep copy is requested.
-		if( plain || forceDeepCopy )
-			tmpPB.setChildBlocks( rcreateDeepCopyProgramBlocks(pfpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy) ); 
-		else
-			tmpPB.setChildBlocks( pfpb.getChildBlocks() );
-		
-		return tmpPB;
-	}
-	
-	/**
-	 * This creates a deep copy of a function program block. The central reference to singletons of function program blocks
-	 * poses the need for explicit copies in order to prevent conflicting writes of temporary variables (see ExternalFunctionProgramBlock.
-	 * 
-	 * @param namespace function namespace
-	 * @param oldName ?
-	 * @param pid ?
-	 * @param IDPrefix ?
-	 * @param prog runtime program
-	 * @param fnStack ?
-	 * @param fnCreated ?
-	 * @param plain ?
-	 */
-	public static void createDeepCopyFunctionProgramBlock(String namespace, String oldName, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain) 
-	{
-		//fpb guaranteed to be non-null (checked inside getFunctionProgramBlock)
-		FunctionProgramBlock fpb = prog.getFunctionProgramBlock(namespace, oldName);
-		String fnameNew = (plain)? oldName :(oldName+CP_CHILD_THREAD+pid); 
-		String fnameNewKey = DMLProgram.constructFunctionKey(namespace,fnameNew);
-
-		if( prog.getFunctionProgramBlocks().containsKey(fnameNewKey) )
-			return; //prevent redundant deep copy if already existent
-		
-		//create deep copy
-		FunctionProgramBlock copy = null;
-		ArrayList<DataIdentifier> tmp1 = new ArrayList<>(); 
-		ArrayList<DataIdentifier> tmp2 = new ArrayList<>(); 
-		if( fpb.getInputParams()!= null )
-			tmp1.addAll(fpb.getInputParams());
-		if( fpb.getOutputParams()!= null )
-			tmp2.addAll(fpb.getOutputParams());
-		
-		if( fpb instanceof ExternalFunctionProgramBlockCP )
-		{
-			ExternalFunctionProgramBlockCP efpb = (ExternalFunctionProgramBlockCP) fpb;
-			HashMap<String,String> tmp3 = efpb.getOtherParams();		
-			if( IDPrefix!=-1 )
-				copy = new ExternalFunctionProgramBlockCP(prog,tmp1,tmp2,tmp3,saveReplaceFilenameThreadID(efpb.getBaseDir(),CP_CHILD_THREAD+IDPrefix,CP_CHILD_THREAD+pid));
-			else	
-				copy = new ExternalFunctionProgramBlockCP(prog,tmp1,tmp2,tmp3,saveReplaceFilenameThreadID(efpb.getBaseDir(),CP_ROOT_THREAD_ID,CP_CHILD_THREAD+pid));
-		}
-		else if( fpb instanceof ExternalFunctionProgramBlock )
-		{
-			ExternalFunctionProgramBlock efpb = (ExternalFunctionProgramBlock) fpb;
-			HashMap<String,String> tmp3 = efpb.getOtherParams();
-			if( IDPrefix!=-1 )
-				copy = new ExternalFunctionProgramBlock(prog,tmp1,tmp2,tmp3,saveReplaceFilenameThreadID(efpb.getBaseDir(),CP_CHILD_THREAD+IDPrefix, CP_CHILD_THREAD+pid));
-			else	
-				copy = new ExternalFunctionProgramBlock(prog,tmp1,tmp2,tmp3,saveReplaceFilenameThreadID(efpb.getBaseDir(),CP_ROOT_THREAD_ID, CP_CHILD_THREAD+pid));
-		}
-		else
-		{
-			if( !fnStack.contains(fnameNewKey) ) {
-				fnStack.add(fnameNewKey);
-				copy = new FunctionProgramBlock(prog, tmp1, tmp2);
-				copy.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, fpb.isRecompileOnce()) );
-				copy.setRecompileOnce( fpb.isRecompileOnce() );
-				copy.setThreadID(pid);
-				fnStack.remove(fnameNewKey);
-			}
-			else //stop deep copy for recursive function calls
-				copy = fpb;
-		}
-		
-		//copy.setVariables( (LocalVariableMap) fpb.getVariables() ); //implicit cloning
-		//note: instructions not used by function program block
-		
-		//put 
-		prog.addFunctionProgramBlock(namespace, fnameNew, copy);
-		fnCreated.add(DMLProgram.constructFunctionKey(namespace, fnameNew));
-	}
-
-	public static FunctionProgramBlock createDeepCopyFunctionProgramBlock(FunctionProgramBlock fpb, HashSet<String> fnStack, HashSet<String> fnCreated) 
-	{
-		if( fpb == null )
-			throw new DMLRuntimeException("Unable to create a deep copy of a non-existing FunctionProgramBlock.");
-	
-		//create deep copy
-		FunctionProgramBlock copy = null;
-		ArrayList<DataIdentifier> tmp1 = new ArrayList<>(); 
-		ArrayList<DataIdentifier> tmp2 = new ArrayList<>(); 
-		if( fpb.getInputParams()!= null )
-			tmp1.addAll(fpb.getInputParams());
-		if( fpb.getOutputParams()!= null )
-			tmp2.addAll(fpb.getOutputParams());
-		
-		copy = new FunctionProgramBlock(fpb.getProgram(), tmp1, tmp2);
-		copy.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), 0, -1, fnStack, fnCreated, true, fpb.isRecompileOnce()) );
-		copy.setStatementBlock( fpb.getStatementBlock() );
-		copy.setRecompileOnce(fpb.isRecompileOnce());
-		//copy.setVariables( (LocalVariableMap) fpb.getVariables() ); //implicit cloning
-		//note: instructions not used by function program block
-	
-		return copy;
-	}
-
-	
-	/**
-	 * Creates a deep copy of an array of instructions and replaces the placeholders of parworker
-	 * IDs with the concrete IDs of this parfor instance. This is a helper method uses for generating
-	 * deep copies of program blocks.
-	 * 
-	 * @param instSet list of instructions
-	 * @param pid ?
-	 * @param IDPrefix ?
-	 * @param prog runtime program
-	 * @param fnStack ?
-	 * @param fnCreated ?
-	 * @param plain ?
-	 * @param cpFunctions ?
-	 * @return list of instructions
-	 */
-	public static ArrayList<Instruction> createDeepCopyInstructionSet(ArrayList<Instruction> instSet, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean cpFunctions) 
-	{
-		ArrayList<Instruction> tmp = new ArrayList<>();
-		for( Instruction inst : instSet )
-		{
-			if( inst instanceof FunctionCallCPInstruction && cpFunctions )
-			{
-				FunctionCallCPInstruction finst = (FunctionCallCPInstruction) inst;
-				createDeepCopyFunctionProgramBlock( finst.getNamespace(),
-						                            finst.getFunctionName(), 
-						                            pid, IDPrefix, prog, fnStack, fnCreated, plain );
-			}
-			
-			tmp.add( cloneInstruction( inst, pid, plain, cpFunctions ) );
-		}
-		
-		return tmp;
-	}
-
-	public static Instruction cloneInstruction( Instruction oInst, long pid, boolean plain, boolean cpFunctions ) 
-	{
-		Instruction inst = null;
-		String tmpString = oInst.toString();
-		
-		try
-		{
-			if( oInst instanceof CPInstruction || oInst instanceof SPInstruction || oInst instanceof MRInstruction 
-					|| oInst instanceof GPUInstruction )
-			{
-				if( oInst instanceof FunctionCallCPInstruction && cpFunctions )
-				{
-					FunctionCallCPInstruction tmp = (FunctionCallCPInstruction) oInst;
-					if( !plain )
-					{
-						//safe replacement because target variables might include the function name
-						//note: this is no update-in-place in order to keep the original function name as basis
-						tmpString = tmp.updateInstStringFunctionName(tmp.getFunctionName(), tmp.getFunctionName() + CP_CHILD_THREAD+pid);
-					}
-					//otherwise: preserve function name
-				}
-				
-				inst = InstructionParser.parseSingleInstruction(tmpString);
-			}
-			else if( oInst instanceof MRJobInstruction )
-			{
-				//clone via copy constructor
-				inst = new MRJobInstruction( (MRJobInstruction)oInst );
-			}
-			else
-				throw new DMLRuntimeException("Failed to clone instruction: "+oInst);
-		}
-		catch(Exception ex)
-		{
-			throw new DMLRuntimeException(ex);
-		}
-		
-		//save replacement of thread id references in instructions
-		inst = saveReplaceThreadID( inst, ProgramConverter.CP_ROOT_THREAD_ID, 
-				                          ProgramConverter.CP_CHILD_THREAD+pid);
-		
-		return inst;
-	}
-
-	public static StatementBlock createStatementBlockCopy( StatementBlock sb, long pid, boolean plain, boolean forceDeepCopy ) 
-	{
-		StatementBlock ret = null;
-		
-		try
-		{
-			if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) 
-				&& sb != null  //forced deep copy for function recompilation
-				&& (Recompiler.requiresRecompilation( sb.getHops() ) || forceDeepCopy)  )
-			{
-				//create new statement (shallow copy livein/liveout for recompile, line numbers for explain)
-				ret = new StatementBlock();
-				ret.setDMLProg(sb.getDMLProg());
-				ret.setParseInfo(sb);
-				ret.setLiveIn( sb.liveIn() ); 
-				ret.setLiveOut( sb.liveOut() ); 
-				ret.setUpdatedVariables( sb.variablesUpdated() );
-				ret.setReadVariables( sb.variablesRead() );
-				
-				//deep copy hops dag for concurrent recompile
-				ArrayList<Hop> hops = Recompiler.deepCopyHopsDag( sb.getHops() );
-				if( !plain )
-					Recompiler.updateFunctionNames( hops, pid );
-				ret.setHops( hops );
-				ret.updateRecompilationFlag();
-			}
-			else
-			{
-				ret = sb;
-			}
-		}
-		catch( Exception ex )
-		{
-			throw new DMLRuntimeException( ex );
-		}
-		
-		return ret;
-	}
-
-	public static IfStatementBlock createIfStatementBlockCopy( IfStatementBlock sb, long pid, boolean plain, boolean forceDeepCopy ) 
-	{
-		IfStatementBlock ret = null;
-		
-		try
-		{
-			if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) 
-				&& sb != null //forced deep copy for function recompile
-				&& (Recompiler.requiresRecompilation( sb.getPredicateHops() ) || forceDeepCopy)  )
-			{
-				//create new statement (shallow copy livein/liveout for recompile, line numbers for explain)
-				ret = new IfStatementBlock();
-				ret.setDMLProg(sb.getDMLProg());
-				ret.setParseInfo(sb);
-				ret.setLiveIn( sb.liveIn() );
-				ret.setLiveOut( sb.liveOut() );
-				ret.setUpdatedVariables( sb.variablesUpdated() );
-				ret.setReadVariables( sb.variablesRead() );
-				
-				//shallow copy child statements
-				ret.setStatements( sb.getStatements() );
-				
-				//deep copy predicate hops dag for concurrent recompile
-				Hop hops = Recompiler.deepCopyHopsDag( sb.getPredicateHops() );
-				ret.setPredicateHops( hops );
-				ret.updatePredicateRecompilationFlag();
-			}
-			else
-			{
-				ret = sb;
-			}
-		}
-		catch( Exception ex )
-		{
-			throw new DMLRuntimeException( ex );
-		}
-		
-		return ret;
-	}
-
-	public static WhileStatementBlock createWhileStatementBlockCopy( WhileStatementBlock sb, long pid, boolean plain, boolean forceDeepCopy ) 
-	{
-		WhileStatementBlock ret = null;
-		
-		try
-		{
-			if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) 
-				&& sb != null  //forced deep copy for function recompile
-				&& (Recompiler.requiresRecompilation( sb.getPredicateHops() ) || forceDeepCopy)  )
-			{
-				//create new statement (shallow copy livein/liveout for recompile, line numbers for explain)
-				ret = new WhileStatementBlock();
-				ret.setDMLProg(sb.getDMLProg());
-				ret.setParseInfo(sb);
-				ret.setLiveIn( sb.liveIn() );
-				ret.setLiveOut( sb.liveOut() );
-				ret.setUpdatedVariables( sb.variablesUpdated() );
-				ret.setReadVariables( sb.variablesRead() );
-				ret.setUpdateInPlaceVars( sb.getUpdateInPlaceVars() );
-				
-				//shallow copy child statements
-				ret.setStatements( sb.getStatements() );
-				
-				//deep copy predicate hops dag for concurrent recompile
-				Hop hops = Recompiler.deepCopyHopsDag( sb.getPredicateHops() );
-				ret.setPredicateHops( hops );
-				ret.updatePredicateRecompilationFlag();
-			}
-			else
-			{
-				ret = sb;
-			}
-		}
-		catch( Exception ex )
-		{
-			throw new DMLRuntimeException( ex );
-		}
-		
-		return ret;
-	}
-
-	public static ForStatementBlock createForStatementBlockCopy( ForStatementBlock sb, long pid, boolean plain, boolean forceDeepCopy ) 
-	{
-		ForStatementBlock ret = null;
-		
-		try
-		{
-			if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) 
-				&& sb != null 
-				&& ( Recompiler.requiresRecompilation(sb.getFromHops()) ||
-					 Recompiler.requiresRecompilation(sb.getToHops()) ||
-					 Recompiler.requiresRecompilation(sb.getIncrementHops()) ||
-					 forceDeepCopy )  )
-			{
-				ret = (sb instanceof ParForStatementBlock) ? new ParForStatementBlock() : new ForStatementBlock();
-				
-				//create new statement (shallow copy livein/liveout for recompile, line numbers for explain)
-				ret.setDMLProg(sb.getDMLProg());
-				ret.setParseInfo(sb);
-				ret.setLiveIn( sb.liveIn() );
-				ret.setLiveOut( sb.liveOut() );
-				ret.setUpdatedVariables( sb.variablesUpdated() );
-				ret.setReadVariables( sb.variablesRead() );
-				ret.setUpdateInPlaceVars( sb.getUpdateInPlaceVars() );
-				
-				//shallow copy child statements
-				ret.setStatements( sb.getStatements() );
-				
-				//deep copy predicate hops dag for concurrent recompile
-				if( sb.requiresFromRecompilation() ){
-					Hop hops = Recompiler.deepCopyHopsDag( sb.getFromHops() );
-					ret.setFromHops( hops );	
-				}
-				if( sb.requiresToRecompilation() ){
-					Hop hops = Recompiler.deepCopyHopsDag( sb.getToHops() );
-					ret.setToHops( hops );	
-				}
-				if( sb.requiresIncrementRecompilation() ){
-					Hop hops = Recompiler.deepCopyHopsDag( sb.getIncrementHops() );
-					ret.setIncrementHops( hops );	
-				}
-				ret.updatePredicateRecompilationFlags();
-			}
-			else
-			{
-				ret = sb;
-			}
-		}
-		catch( Exception ex )
-		{
-			throw new DMLRuntimeException( ex );
-		}
-		
-		return ret;
-	}
-	
-	
-	////////////////////////////////
-	// SERIALIZATION 
-	////////////////////////////////	
-
-	public static String serializeParForBody( ParForBody body ) {
-		return serializeParForBody(body, new HashMap<String, byte[]>());
-	}	
-	
-	public static String serializeParForBody( ParForBody body, HashMap<String,byte[]> clsMap ) 
-	{
-		ArrayList<ProgramBlock> pbs = body.getChildBlocks();
-		ArrayList<ResultVar> rVnames = body.getResultVariables();
-		ExecutionContext ec = body.getEc();
-		
-		if( pbs.isEmpty() )
-			return PARFORBODY_BEGIN + PARFORBODY_END;
-		
-		Program prog = pbs.get( 0 ).getProgram();
-		
-		StringBuilder sb = new StringBuilder();
-		sb.append( PARFORBODY_BEGIN );
-		sb.append( NEWLINE );
-		
-		//handle DMLScript UUID (propagate original uuid for writing to scratch space)
-		sb.append( DMLScript.getUUID() );
-		sb.append( COMPONENTS_DELIM );
-		sb.append( NEWLINE );
-		
-		//handle DML config
-		sb.append( ConfigurationManager.getDMLConfig().serializeDMLConfig() );
-		sb.append( COMPONENTS_DELIM );
-		sb.append( NEWLINE );
-		
-		//handle additional configurations
-		sb.append( PARFOR_CONF_STATS + "=" + DMLScript.STATISTICS );
-		sb.append( COMPONENTS_DELIM );
-		sb.append( NEWLINE );
-		
-		//handle program
-		sb.append( PARFOR_PROG_BEGIN );
-		sb.append( NEWLINE );
-		sb.append( serializeProgram(prog, pbs, clsMap) );
-		sb.append( PARFOR_PROG_END );
-		sb.append( NEWLINE );
-		sb.append( COMPONENTS_DELIM );
-		sb.append( NEWLINE );
-		
-		//handle result variable names
-		sb.append( serializeResultVariables(rVnames) );
-		sb.append( COMPONENTS_DELIM );
-		
-		//handle execution context
-		//note: this includes also the symbol table (serialize only the top-level variable map,
-		//      (symbol tables for nested/child blocks are created at parse time, on the remote side)
-		sb.append( PARFOR_EC_BEGIN );
-		sb.append( serializeExecutionContext(ec) );
-		sb.append( PARFOR_EC_END );
-		sb.append( NEWLINE );
-		sb.append( COMPONENTS_DELIM );
-		sb.append( NEWLINE );
-		
-		//handle program blocks -- ONLY instructions, not variables.
-		sb.append( PARFOR_PBS_BEGIN );
-		sb.append( NEWLINE );
-		sb.append( rSerializeProgramBlocks(pbs, clsMap) );
-		sb.append( PARFOR_PBS_END );
-		sb.append( NEWLINE );
-		
-		sb.append( PARFORBODY_END );
-		
-		return sb.toString();
-	}
-
-	private static String serializeProgram( Program prog, ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap ) {
-		//note program contains variables, programblocks and function program blocks 
-		//but in order to avoid redundancy, we only serialize function program blocks
-		HashMap<String, FunctionProgramBlock> fpb = prog.getFunctionProgramBlocks();
-		HashSet<String> cand = new HashSet<>();
-		rFindSerializationCandidates(pbs, cand);
-		return rSerializeFunctionProgramBlocks( fpb, cand, clsMap );
-	}
-
-	private static void rFindSerializationCandidates( ArrayList<ProgramBlock> pbs, HashSet<String> cand ) 
-	{
-		for( ProgramBlock pb : pbs )
-		{
-			if( pb instanceof WhileProgramBlock )
-			{
-				WhileProgramBlock wpb = (WhileProgramBlock) pb;
-				rFindSerializationCandidates(wpb.getChildBlocks(), cand );
-			}
-			else if ( pb instanceof ForProgramBlock || pb instanceof ParForProgramBlock )
-			{
-				ForProgramBlock fpb = (ForProgramBlock) pb; 
-				rFindSerializationCandidates(fpb.getChildBlocks(), cand);
-			}				
-			else if ( pb instanceof IfProgramBlock )
-			{
-				IfProgramBlock ipb = (IfProgramBlock) pb;
-				rFindSerializationCandidates(ipb.getChildBlocksIfBody(), cand);
-				if( ipb.getChildBlocksElseBody() != null )
-					rFindSerializationCandidates(ipb.getChildBlocksElseBody(), cand);
-			}
-			else //all generic program blocks
-			{
-				for( Instruction inst : pb.getInstructions() )
-					if( inst instanceof FunctionCallCPInstruction )
-					{
-						FunctionCallCPInstruction fci = (FunctionCallCPInstruction) inst;
-						String fkey = DMLProgram.constructFunctionKey(fci.getNamespace(), fci.getFunctionName());
-						if( !cand.contains(fkey) ) //memoization for multiple calls, recursion
-						{
-							cand.add( fkey ); //add to candidates
-							
-							//investigate chains of function calls
-							FunctionProgramBlock fpb = pb.getProgram().getFunctionProgramBlock(fci.getNamespace(), fci.getFunctionName());
-							rFindSerializationCandidates(fpb.getChildBlocks(), cand);
-						}
-					}
-			}
-		}
-	}
-
-	private static String serializeVariables (LocalVariableMap vars) {
-		StringBuilder sb = new StringBuilder();
-		sb.append( PARFOR_VARS_BEGIN );
-		sb.append( vars.serialize() );
-		sb.append( PARFOR_VARS_END );
-		return sb.toString();
-	}
-
-	public static String serializeDataObject(String key, Data dat) 
-	{
-		// SCHEMA: <name>|<datatype>|<valuetype>|value
-		// (scalars are serialize by value, matrices by filename)
-		StringBuilder sb = new StringBuilder();
-		//prepare data for serialization
-		String name = key;
-		DataType datatype = dat.getDataType();
-		ValueType valuetype = dat.getValueType();
-		String value = null;
-		String[] matrixMetaData = null;
-		switch( datatype )
-		{
-			case SCALAR:
-				ScalarObject so = (ScalarObject) dat;
-				//name = so.getName();
-				value = so.getStringValue();
-				break;
-			case MATRIX:
-				MatrixObject mo = (MatrixObject) dat;
-				MetaDataFormat md = (MetaDataFormat) dat.getMetaData();
-				MatrixCharacteristics mc = md.getMatrixCharacteristics();
-				value = mo.getFileName();
-				PartitionFormat partFormat = (mo.getPartitionFormat()!=null) ? new PartitionFormat(
-						mo.getPartitionFormat(),mo.getPartitionSize()) : PartitionFormat.NONE;
-				matrixMetaData = new String[9];
-				matrixMetaData[0] = String.valueOf( mc.getRows() );
-				matrixMetaData[1] = String.valueOf( mc.getCols() );
-				matrixMetaData[2] = String.valueOf( mc.getRowsPerBlock() );
-				matrixMetaData[3] = String.valueOf( mc.getColsPerBlock() );
-				matrixMetaData[4] = String.valueOf( mc.getNonZeros() );
-				matrixMetaData[5] = InputInfo.inputInfoToString( md.getInputInfo() );
-				matrixMetaData[6] = OutputInfo.outputInfoToString( md.getOutputInfo() );
-				matrixMetaData[7] = String.valueOf( partFormat );
-				matrixMetaData[8] = String.valueOf( mo.getUpdateType() );
-				break;
-			default:
-				throw new DMLRuntimeException("Unable to serialize datatype "+datatype);
-		}
-		
-		//serialize data
-		sb.append(name);
-		sb.append(DATA_FIELD_DELIM);
-		sb.append(datatype);
-		sb.append(DATA_FIELD_DELIM);
-		sb.append(valuetype);
-		sb.append(DATA_FIELD_DELIM);
-		sb.append(value);		
-		if( matrixMetaData != null )
-			for( int i=0; i<matrixMetaData.length; i++ )
-			{
-				sb.append(DATA_FIELD_DELIM);
-				sb.append(matrixMetaData[i]);
-			}
-		
-		return sb.toString();
-	}
-
-	private static String serializeExecutionContext( ExecutionContext ec ) {
-		return (ec != null) ? serializeVariables( ec.getVariables() ) : EMPTY;
-	}
-
-	@SuppressWarnings("all")
-	private static String serializeInstructions( ArrayList<Instruction> inst, HashMap<String, byte[]> clsMap ) 
-	{
-		StringBuilder sb = new StringBuilder();
-		int count = 0;
-		for( Instruction linst : inst )
-		{
-			//check that only cp instruction are transmitted 
-			if( !( linst instanceof CPInstruction || linst instanceof ExternalFunctionInvocationInstruction ) )
-				throw new DMLRuntimeException( NOT_SUPPORTED_MR_INSTRUCTION + " " +linst.getClass().getName()+"\n"+linst );
-			
-			//obtain serialized version of generated classes
-			if( linst instanceof SpoofCPInstruction ) {
-				Class<?> cla = ((SpoofCPInstruction) linst).getOperatorClass();
-				clsMap.put(cla.getName(), CodegenUtils.getClassData(cla.getName()));
-			}
-			
-			if( count > 0 )
-				sb.append( ELEMENT_DELIM );
-			
-			sb.append( checkAndReplaceLiterals( linst.toString() ) );
-			count++;
-		}
-		
-		return sb.toString();	
-	}
-	
-	/**
-	 * Replacement of internal delimiters occurring in literals of instructions
-	 * in order to ensure robustness of serialization and parsing.
-	 * (e.g. print( "a,b" ) would break the parsing of instruction that internally
-	 * are separated with a "," )
-	 * 
-	 * @param instStr instruction string
-	 * @return instruction string with replacements
-	 */
-	private static String checkAndReplaceLiterals( String instStr )
-	{
-		String tmp = instStr;
-		
-		//1) check own delimiters (very unlikely due to special characters)
-		if( tmp.contains(COMPONENTS_DELIM) ) {
-			tmp = tmp.replaceAll(COMPONENTS_DELIM, ".");
-			LOG.warn("Replaced special literal character sequence "+COMPONENTS_DELIM+" with '.'");
-		}
-		
-		if( tmp.contains(ELEMENT_DELIM) ) {
-			tmp = tmp.replaceAll(ELEMENT_DELIM, ".");
-			LOG.warn("Replaced special literal character sequence "+ELEMENT_DELIM+" with '.'");
-		}
-			
-		if( tmp.contains( LEVELIN ) ){
-			tmp = tmp.replaceAll(LEVELIN, "("); // '\\' required if LEVELIN='{' because regex
-			LOG.warn("Replaced special literal character sequence "+LEVELIN+" with '('");
-		}
-
-		if( tmp.contains(LEVELOUT) ){
-			tmp = tmp.replaceAll(LEVELOUT, ")");
-			LOG.warn("Replaced special literal character sequence "+LEVELOUT+" with ')'");
-		}
-		
-		//NOTE: DATA_FIELD_DELIM and KEY_VALUE_DELIM not required
-		//because those literals cannot occur in critical places.
-		
-		//2) check end tag of CDATA
-		if( tmp.contains(PARFOR_CDATA_END) ){
-			tmp = tmp.replaceAll(PARFOR_CDATA_END, "."); //prevent XML parsing issues in job.xml
-			LOG.warn("Replaced special literal character sequence "+PARFOR_CDATA_END+" with '.'");
-		}
-		
-		return tmp;
-	}
-
-	private static String serializeStringHashMap( HashMap<String,String> vars)
-	{
-		StringBuilder sb = new StringBuilder();
-		int count=0;
-		for( Entry<String,String> e : vars.entrySet() )
-		{
-			if(count>0)
-				sb.append( ELEMENT_DELIM );
-			sb.append( e.getKey() );
-			sb.append( KEY_VALUE_DELIM );
-			sb.append( e.getValue() );
-			count++;
-		}
-		return sb.toString();
-	}
-
-	public static String serializeStringCollection( Collection<String> set)
-	{
-		StringBuilder sb = new StringBuilder();
-		int count=0;
-		for( String s : set )
-		{
-			if(count>0)
-				sb.append( ", " );
-			sb.append( s );
-			count++;
-		}
-		return sb.toString();
-	}
-
-	public static String serializeResultVariables( ArrayList<ResultVar> vars) {
-		StringBuilder sb = new StringBuilder();
-		int count=0;
-		for( ResultVar var : vars ) {
-			if(count>0)
-				sb.append( ELEMENT_DELIM );
-			sb.append( var._isAccum ? var._name+"+" : var._name );
-			count++;
-		}
-		return sb.toString();
-	}
-	
-	public static String serializeStringArrayList( ArrayList<String> vars)
-	{
-		StringBuilder sb = new StringBuilder();
-		int count=0;
-		for( String s : vars )
-		{
-			if(count>0)
-				sb.append( ELEMENT_DELIM );
-			sb.append( s );
-			count++;
-		}
-		return sb.toString();
-	}
-
-	private static String serializeDataIdentifiers( ArrayList<DataIdentifier> var)
-	{
-		StringBuilder sb = new StringBuilder();
-		int count=0;
-		for( DataIdentifier dat : var )
-		{
-			if(count>0)
-				sb.append( ELEMENT_DELIM );
-			sb.append( serializeDataIdentifier(dat) );
-			count++;
-		}
-		return sb.toString();
-	}
-
-	private static String serializeDataIdentifier( DataIdentifier dat ) {
-		// SCHEMA: <name>|<datatype>|<valuetype>
-		StringBuilder sb = new StringBuilder();
-		sb.append(dat.getName());
-		sb.append(DATA_FIELD_DELIM);
-		sb.append(dat.getDataType());
-		sb.append(DATA_FIELD_DELIM);
-		sb.append(dat.getValueType());
-		
-		return sb.toString();
-	}
-
-	private static String rSerializeFunctionProgramBlocks(HashMap<String,FunctionProgramBlock> pbs, HashSet<String> cand, HashMap<String, byte[]> clsMap) 
-	{
-		StringBuilder sb = new StringBuilder();
-		
-		int count = 0;
-		for( Entry<String,FunctionProgramBlock> pb : pbs.entrySet() )
-		{
-			if( !cand.contains(pb.getKey()) ) //skip function not included in the parfor body
-				continue;
-				
-			if( count>0 ) {
-			   sb.append( ELEMENT_DELIM );
-			   sb.append( NEWLINE );
-			}
-			sb.append( pb.getKey() );
-			sb.append( KEY_VALUE_DELIM );
-			sb.append( rSerializeProgramBlock(pb.getValue(), clsMap) );
-			count++;
-		}
-		sb.append(NEWLINE);
-		return sb.toString();
-	}
-
-	private static String rSerializeProgramBlocks(ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap) {
-		StringBuilder sb = new StringBuilder();
-		int count = 0;
-		for( ProgramBlock pb : pbs )
-		{
-			if( count>0 )
-			{
-			   sb.append( ELEMENT_DELIM );
-			   sb.append(NEWLINE);
-			}
-			sb.append( rSerializeProgramBlock(pb, clsMap) );
-			count++;
-		}
-
-		return sb.toString();
-	}
-
-	private static String rSerializeProgramBlock( ProgramBlock pb, HashMap<String, byte[]> clsMap ) {
-		StringBuilder sb = new StringBuilder();
-		
-		//handle header
-		if( pb instanceof WhileProgramBlock ) 
-			sb.append( PARFOR_PB_WHILE );
-		else if ( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock) )
-			sb.append( PARFOR_PB_FOR );
-		else if ( pb instanceof ParForProgramBlock )
-			sb.append( PARFOR_PB_PARFOR );
-		else if ( pb instanceof IfProgramBlock )
-			sb.append( PARFOR_PB_IF );
-		else if ( pb instanceof FunctionProgramBlock && !(pb instanceof ExternalFunctionProgramBlock) )
-			sb.append( PARFOR_PB_FC );
-		else if ( pb instanceof ExternalFunctionProgramBlock )
-			sb.append( PARFOR_PB_EFC );
-		else //all generic program blocks
-			sb.append( PARFOR_PB_BEGIN );
-		
-		//handle body
-		if( pb instanceof WhileProgramBlock )
-		{
-			WhileProgramBlock wpb = (WhileProgramBlock) pb;
-			sb.append( PARFOR_INST_BEGIN );
-			sb.append( serializeInstructions( wpb.getPredicate(), clsMap ) );
-			sb.append( PARFOR_INST_END );
-			sb.append( COMPONENTS_DELIM );
-			sb.append( PARFOR_INST_BEGIN );
-			sb.append( serializeInstructions( wpb.getExitInstructions(), clsMap ) );
-			sb.append( PARFOR_INST_END );
-			sb.append( COMPONENTS_DELIM );
-			sb.append( PARFOR_PBS_BEGIN );
-			sb.append( rSerializeProgramBlocks( wpb.getChildBlocks(), clsMap) );
-			sb.append( PARFOR_PBS_END );
-		}
-		else if ( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock ) )
-		{
-			ForProgramBlock fpb = (ForProgramBlock) pb; 
-			sb.append( fpb.getIterVar() );
-			sb.append( COMPONENTS_DELIM );
-			sb.append( PARFOR_INST_BEGIN );
-			sb.append( serializeInstructions( fpb.getFromInstructions(), clsMap ) );
-			sb.append( PARFOR_INST_END );	
-			sb.append( COMPONENTS_DELIM );
-			sb.append( PARFOR_INST_BEGIN );
-			sb.append( serializeInstructions(fpb.getToInstructions(), clsMap) );
-			sb.append( PARFOR_INST_END );	
-			sb.append( COMPONENTS_DELIM );
-			sb.append( PARFOR_INST_BEGIN );
-			sb.append( serializeInstructions(fpb.getIncrementInstructions(), clsMap) );
-			sb.append( PARFOR_INST_END );
-			sb.append( COMPONENTS_DELIM );
-			sb.append( PARFOR_INST_BEGIN );
-			sb.append( serializeInstructions(fpb.getExitInstructions(), clsMap) );
-			sb.append( PARFOR_INST_END );
-			sb.append( COMPONENTS_DELIM );
-			sb.append( PARFOR_PBS_BEGIN );
-			sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) );
-			sb.append( PARFOR_PBS_END );
-		}
-		else if ( pb instanceof ParForProgramBlock )
-		{	
-			ParForProgramBlock pfpb = (ParForProgramBlock) pb; 
-			
-			//check for nested remote ParFOR
-			if( PExecMode.valueOf( pfpb.getParForParams().get( ParForStatementBlock.EXEC_MODE )) == PExecMode.REMOTE_MR )
-				throw new DMLRuntimeException( NOT_SUPPORTED_MR_PARFOR );
-			
-			sb.append( pfpb.getIterVar() );
-			sb.append( COMPONENTS_DELIM );
-			sb.append( serializeResultVariables( pfpb.getResultVariables()) );
-			sb.append( COMPONENTS_DELIM );
-			sb.append( serializeStringHashMap( pfpb.getParForParams()) ); //parameters of nested parfor
-			sb.append( COMPONENTS_DELIM );
-			sb.append( PARFOR_INST_BEGIN );
-			sb.append( serializeInstructions(pfpb.getFromInstructions(), clsMap) );
-			sb.append( PARFOR_INST_END );	
-			sb.append( COMPONENTS_DELIM );
-			sb.append( PARFOR_INST_BEGIN );
-			sb.append( serializeInstructions(pfpb.getToInstructions(), clsMap) );
-			sb.append( PARFOR_INST_END );	
-			sb.append( COMPONENTS_DELIM );
-			sb.append( PARFOR_INST_BEGIN );
-			sb.append( serializeInstructions(pfpb.getIncrementInstructions(), clsMap) );
-			sb.append( PARFOR_INST_END );	
-			sb.append( COMPONENTS_DELIM );	
-			sb.append( PARFOR_INST_BEGIN );
-			sb.append( serializeInstructions(pfpb.getExitInstructions(), clsMap) );
-			sb.append( PARFOR_INST_END );
-			sb.append( COMPONENTS_DELIM );
-			sb.append( PARFOR_PBS_BEGIN );
-			sb.append( rSerializeProgramBlocks( pfpb.getChildBlocks(), clsMap ) );
-			sb.append( PARFOR_PBS_END );
-		}				
-		else if ( pb instanceof IfProgramBlock )
-		{
-			IfProgramBlock ipb = (IfProgramBlock) pb;
-			sb.append( PARFOR_INST_BEGIN );
-			sb.append( serializeInstructions(ipb.getPredicate(), clsMap) );
-			sb.append( PARFOR_INST_END );
-			sb.append( COMPONENTS_DELIM );
-			sb.append( PARFOR_INST_BEGIN );
-			sb.append( serializeInstructions(ipb.getExitInstructions(), clsMap) );
-			sb.append( PARFOR_INST_END );
-			sb.append( COMPONENTS_DELIM );
-			sb.append( PARFOR_PBS_BEGIN );
-			sb.append( rSerializeProgramBlocks(ipb.getChildBlocksIfBody(), clsMap) );
-			sb.append( PARFOR_PBS_END );
-			sb.append( COMPONENTS_DELIM );
-			sb.append( PARFOR_PBS_BEGIN );
-			sb.append( rSerializeProgramBlocks(ipb.getChildBlocksElseBody(), clsMap) );
-			sb.append( PARFOR_PBS_END );
-		}
-		else if( pb instanceof FunctionProgramBlock && !(pb instanceof ExternalFunctionProgramBlock) )
-		{
-			FunctionProgramBlock fpb = (FunctionProgramBlock) pb;
-			
-			sb.append( serializeDataIdentifiers( fpb.getInputParams() ) );
-			sb.append( COMPONENTS_DELIM );
-			sb.append( serializeDataIdentifiers( fpb.getOutputParams() ) );
-			sb.append( COMPONENTS_DELIM );
-			sb.append( PARFOR_INST_BEGIN );
-			sb.append( serializeInstructions(fpb.getInstructions(), clsMap) );
-			sb.append( PARFOR_INST_END );	
-			sb.append( COMPONENTS_DELIM );
-			sb.append( PARFOR_PBS_BEGIN );
-			sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) );
-			sb.append( PARFOR_PBS_END );
-			sb.append( COMPONENTS_DELIM );
-		}
-		else if( pb instanceof ExternalFunctionProgramBlock )
-		{
-			if( !(pb instanceof ExternalFunctionProgramBlockCP) ) 
-			{
-				throw new DMLRuntimeException( NOT_SUPPORTED_EXTERNALFUNCTION_PB );
-			}
-			
-			ExternalFunctionProgramBlockCP fpb = (ExternalFunctionProgramBlockCP) pb;
-			
-			sb.append( serializeDataIdentifiers( fpb.getInputParams() ) );
-			sb.append( COMPONENTS_DELIM );
-			sb.append( serializeDataIdentifiers( fpb.getOutputParams() ) );
-			sb.append( COMPONENTS_DELIM );
-			sb.append( serializeStringHashMap( fpb.getOtherParams() ) );
-			sb.append( COMPONENTS_DELIM );
-			sb.append( fpb.getBaseDir() );
-			sb.append( COMPONENTS_DELIM );
-			
-			sb.append( PARFOR_INST_BEGIN );
-			//create on construction anyway 
-			sb.append( PARFOR_INST_END );	
-			sb.append( COMPONENTS_DELIM );
-			sb.append( PARFOR_PBS_BEGIN );
-			sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) );
-			sb.append( PARFOR_PBS_END );
-		}
-		else //all generic program blocks
-		{
-			sb.append( PARFOR_INST_BEGIN );
-			sb.append( serializeInstructions(pb.getInstructions(), clsMap) );
-			sb.append( PARFOR_INST_END );
-		}
-		
-		
-		//handle end
-		sb.append( PARFOR_PB_END );
-		
-		return sb.toString();
-	}
-
-	
-	////////////////////////////////
-	// PARSING 
-	////////////////////////////////
-	public static ParForBody parseParForBody( String in, int id ) {
-		return parseParForBody(in, id, false);
-	}
-	
-	public static ParForBody parseParForBody( String in, int id, boolean inSpark ) {
-		ParForBody body = new ParForBody();
-		
-		//header elimination
-		String tmpin = in.replaceAll(NEWLINE, ""); //normalization
-		tmpin = tmpin.substring(PARFORBODY_BEGIN.length(),tmpin.length()-PARFORBODY_END.length()); //remove start/end
-		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(tmpin, COMPONENTS_DELIM);
-		
-		//handle DMLScript UUID (NOTE: set directly in DMLScript)
-		//(master UUID is used for all nodes (in order to simply cleanup))
-		DMLScript.setUUID( st.nextToken() );
-		
-		//handle DML config (NOTE: set directly in ConfigurationManager)
-		String confStr = st.nextToken();
-		JobConf job = ConfigurationManager.getCachedJobConf();
-		if( !InfrastructureAnalyzer.isLocalMode(job) ) {
-			if( confStr != null && !confStr.trim().isEmpty() ) {
-				DMLConfig dmlconf = DMLConfig.parseDMLConfig(confStr);
-				CompilerConfig cconf = OptimizerUtils.constructCompilerConfig(dmlconf);
-				ConfigurationManager.setLocalConfig(dmlconf);
-				ConfigurationManager.setLocalConfig(cconf);
-			}
-			//init internal configuration w/ parsed or default config
-			ParForProgramBlock.initInternalConfigurations(
-					ConfigurationManager.getDMLConfig());
-		}
-		
-		//handle additional configs
-		String aconfs = st.nextToken();
-		if( !inSpark )
-			parseAndSetAdditionalConfigurations( aconfs );
-		
-		//handle program
-		String progStr = st.nextToken();
-		Program prog = parseProgram( progStr, id ); 
-		
-		//handle result variable names
-		String rvarStr = st.nextToken();
-		ArrayList<ResultVar> rvars = parseResultVariables(rvarStr);
-		body.setResultVariables(rvars);
-		
-		//handle execution context
-		String ecStr = st.nextToken();
-		ExecutionContext ec = parseExecutionContext( ecStr, prog );
-		
-		//handle program blocks
-		String spbs = st.nextToken();
-		ArrayList<ProgramBlock> pbs = rParseProgramBlocks(spbs, prog, id);
-		
-		body.setChildBlocks( pbs );
-		body.setEc( ec );
-		
-		return body;
-	}
-
-	public static Program parseProgram( String in, int id ) {
-		String lin = in.substring( PARFOR_PROG_BEGIN.length(),in.length()-PARFOR_PROG_END.length()).trim(); 
-		
-		Program prog = new Program();
-		HashMap<String,FunctionProgramBlock> fc = parseFunctionProgramBlocks(lin, prog, id);
-	
-		for( Entry<String,FunctionProgramBlock> e : fc.entrySet() )
-		{
-			String[] keypart = e.getKey().split( Program.KEY_DELIM );
-			String namespace = keypart[0];
-			String name      = keypart[1];
-			
-			prog.addFunctionProgramBlock(namespace, name, e.getValue());
-		}
-		
-		return prog;
-	}
-
-	private static LocalVariableMap parseVariables(String in) {
-		LocalVariableMap ret = null;
-		
-		if( in.length()> PARFOR_VARS_BEGIN.length() + PARFOR_VARS_END.length())
-		{
-			String varStr = in.substring( PARFOR_VARS_BEGIN.length(),in.length()-PARFOR_VARS_END.length()).trim(); 
-			ret = LocalVariableMap.deserialize(varStr);
-		}
-		else //empty input symbol table
-		{
-			ret = new LocalVariableMap();
-		}
-
-		return ret;
-	}
-
-	private static HashMap<String,FunctionProgramBlock> parseFunctionProgramBlocks( String in, Program prog, int id ) {
-		HashMap<String,FunctionProgramBlock> ret = new HashMap<>();
-		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer( in, ELEMENT_DELIM );
-		
-		while( st.hasMoreTokens() )
-		{
-			String lvar  = st.nextToken(); //with ID = CP_CHILD_THREAD+id for current use
-			
-			//put first copy into prog (for direct use)
-			int index = lvar.indexOf( KEY_VALUE_DELIM );
-			String tmp1 = lvar.substring(0, index); // + CP_CHILD_THREAD+id;
-			String tmp2 = lvar.substring(index + 1);
-			ret.put(tmp1, (FunctionProgramBlock)rParseProgramBlock(tmp2, prog, id));
-		}
-
-		return ret;
-	}
-
-	private static ArrayList<ProgramBlock> rParseProgramBlocks(String in, Program prog, int id) {
-		ArrayList<ProgramBlock> pbs = new ArrayList<>();
-		String tmpdata = in.substring(PARFOR_PBS_BEGIN.length(),in.length()-PARFOR_PBS_END.length());
-		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(tmpdata, ELEMENT_DELIM);
-		
-		while( st.hasMoreTokens() )
-		{
-			String tmp = st.nextToken();
-			pbs.add( rParseProgramBlock( tmp, prog, id ) );
-		}
-		
-		return pbs;
-	}
-
-	private static ProgramBlock rParseProgramBlock( String in, Program prog, int id ) {
-		ProgramBlock pb = null;
-		
-		if( in.startsWith( PARFOR_PB_WHILE ) )
-			pb = rParseWhileProgramBlock( in, prog, id );
-		else if ( in.startsWith(PARFOR_PB_FOR ) )
-			pb = rParseForProgramBlock( in, prog, id );
-		else if ( in.startsWith(PARFOR_PB_PARFOR ) )
-			pb = rParseParForProgramBlock( in, prog, id );
-		else if ( in.startsWith(PARFOR_PB_IF ) )
-			pb = rParseIfProgramBlock( in, prog, id );
-		else if ( in.startsWith(PARFOR_PB_FC ) )
-			pb = rParseFunctionProgramBlock( in, prog, id );
-		else if ( in.startsWith(PARFOR_PB_EFC ) )
-			pb = rParseExternalFunctionProgramBlock( in, prog, id );
- 		else if ( in.startsWith(PARFOR_PB_BEGIN ) )
-			pb = rParseGenericProgramBlock( in, prog, id );
-		else 
-			throw new DMLRuntimeException( NOT_SUPPORTED_PB+" "+in );
-		
-		return pb;
-	}
-
-	private static WhileProgramBlock rParseWhileProgramBlock( String in, Program prog, int id ) {
-		String lin = in.substring( PARFOR_PB_WHILE.length(),in.length()-PARFOR_PB_END.length()); 
-		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
-		
-		//predicate instructions
-		ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id);
-		
-		//exit instructions
-		ArrayList<Instruction> exit = parseInstructions(st.nextToken(),id);
-		
-		//program blocks
-		ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id);
-		
-		WhileProgramBlock wpb = new WhileProgramBlock(prog,inst);
-		wpb.setExitInstructions2(exit);
-		wpb.setChildBlocks(pbs);
-		
-		return wpb;
-	}
-
-	private static ForProgramBlock rParseForProgramBlock( String in, Program prog, int id ) {
-		String lin = in.substring( PARFOR_PB_FOR.length(),in.length()-PARFOR_PB_END.length()); 
-		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
-		
-		//inputs
-		String iterVar = st.nextToken();
-		
-		//instructions
-		ArrayList<Instruction> from = parseInstructions(st.nextToken(),id);
-		ArrayList<Instruction> to = parseInstructions(st.nextToken(),id);
-		ArrayList<Instruction> incr = parseInstructions(st.nextToken(),id);
-		
-		//exit instructions
-		ArrayList<Instruction> exit = parseInstructions(st.nextToken(),id);
-		
-		//program blocks
-		ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id);
-		
-		ForProgramBlock fpb = new ForProgramBlock(prog, iterVar);
-		fpb.setFromInstructions(from);
-		fpb.setToInstructions(to);
-		fpb.setIncrementInstructions(incr);
-		fpb.setExitInstructions(exit);
-		fpb.setChildBlocks(pbs);
-		
-		return fpb;
-	}
-
-	private static ParForProgramBlock rParseParForProgramBlock( String in, Program prog, int id ) {
-		String lin = in.substring( PARFOR_PB_PARFOR.length(),in.length()-PARFOR_PB_END.length()); 
-		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
-		
-		//inputs
-		String iterVar = st.nextToken();
-		ArrayList<ResultVar> resultVars = parseResultVariables(st.nextToken());
-		HashMap<String,String> params = parseStringHashMap(st.nextToken());
-		
-		//instructions 
-		ArrayList<Instruction> from = parseInstructions(st.nextToken(), 0);
-		ArrayList<Instruction> to = parseInstructions(st.nextToken(), 0);
-		ArrayList<Instruction> incr = parseInstructions(st.nextToken(), 0);
-		
-		//exit instructions
-		ArrayList<Instruction> exit = parseInstructions(st.nextToken(), 0);
-		
-		//program blocks //reset id to preinit state, replaced during exec
-		ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, 0); 
-		
-		ParForProgramBlock pfpb = new ParForProgramBlock(id, prog, iterVar, params, resultVars);
-		pfpb.disableOptimization(); //already done in top-level parfor
-		pfpb.setFromInstructions(from);
-		pfpb.setToInstructions(to);
-		pfpb.setIncrementInstructions(incr);
-		pfpb.setExitInstructions(exit);
-		pfpb.setChildBlocks(pbs);
-		
-		return pfpb;
-	}
-
-	private static IfProgramBlock rParseIfProgramBlock( String in, Program prog, int id ) {
-		String lin = in.substring( PARFOR_PB_IF.length(),in.length()-PARFOR_PB_END.length());
-		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
-		
-		//predicate instructions
-		ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id);
-		
-		//exit instructions
-		ArrayList<Instruction> exit = parseInstructions(st.nextToken(),id);
-		
-		//program blocks: if and else
-		ArrayList<ProgramBlock> pbs1 = rParseProgramBlocks(st.nextToken(), prog, id);
-		ArrayList<ProgramBlock> pbs2 = rParseProgramBlocks(st.nextToken(), prog, id);
-		
-		IfProgramBlock ipb = new IfProgramBlock(prog,inst);
-		ipb.setExitInstructions2(exit);
-		ipb.setChildBlocksIfBody(pbs1);
-		ipb.setChildBlocksElseBody(pbs2);
-		
-		return ipb;
-	}
-
-	private static FunctionProgramBlock rParseFunctionProgramBlock( String in, Program prog, int id ) {
-		String lin = in.substring( PARFOR_PB_FC.length(),in.length()-PARFOR_PB_END.length());
-		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
-		
-		//inputs and outputs
-		ArrayList<DataIdentifier> dat1 = parseDataIdentifiers(st.nextToken());
-		ArrayList<DataIdentifier> dat2 = parseDataIdentifiers(st.nextToken());
-		
-		//instructions
-		ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id);
-
-		//program blocks
-		ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id);
-
-		ArrayList<DataIdentifier> tmp1 = new ArrayList<>(dat1);
-		ArrayList<DataIdentifier> tmp2 = new ArrayList<>(dat2);
-		FunctionProgramBlock fpb = new FunctionProgramBlock(prog, tmp1, tmp2);
-		fpb.setInstructions(inst);
-		fpb.setChildBlocks(pbs);
-		
-		return fpb;
-	}
-
-	private static ExternalFunctionProgramBlock rParseExternalFunctionProgramBlock( String in, Program prog, int id ) {
-		String lin = in.substring( PARFOR_PB_EFC.length(),in.length()-PARFOR_PB_END.length()); 
-		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
-		
-		//LocalVariableMap vars = parseVariables(st.nextToken());
-		
-		//inputs, outputs and params
-		ArrayList<DataIdentifier> dat1 = parseDataIdentifiers(st.nextToken());
-		ArrayList<DataIdentifier> dat2 = parseDataIdentifiers(st.nextToken());
-		HashMap<String,String> dat3 = parseStringHashMap(st.nextToken());
-
-		//basedir
-		String basedir = st.nextToken();
-		
-		//instructions (required for removing INST BEGIN, END)
-		parseInstructions(st.nextToken(),id);
-
-		//program blocks
-		ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id);
-
-		ArrayList<DataIdentifier> tmp1 = new ArrayList<>(dat1);
-		ArrayList<DataIdentifier> tmp2 = new ArrayList<>(dat2);
-		
-		//only CP external functions, because no nested MR jobs for reblocks
-		ExternalFunctionProgramBlockCP efpb = new ExternalFunctionProgramBlockCP(prog, tmp1, tmp2, dat3, basedir);
-		efpb.setChildBlocks(pbs);
-		
-		return efpb;
-	}
-
-	private static ProgramBlock rParseGenericProgramBlock( String in, Program prog, int id ) {
-		String lin = in.substring( PARFOR_PB_BEGIN.length(),in.length()-PARFOR_PB_END.length()); 
-		StringTokenizer st = new StringTokenizer(lin,COMPONENTS_DELIM);
-		
-		ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id);
-		
-		ProgramBlock pb = new ProgramBlock(prog);
-		pb.setInstructions(inst);
-		
-		return pb;
-	}
-
-	private static ArrayList<Instruction> parseInstructions( String in, int id ) {
-		ArrayList<Instruction> insts = new ArrayList<>();
-		String lin = in.substring( PARFOR_INST_BEGIN.length(),in.length()-PARFOR_INST_END.length()); 
-		StringTokenizer st = new StringTokenizer(lin, ELEMENT_DELIM);
-		while(st.hasMoreTokens()) {
-			//Note that at this point only CP instructions and External function instruction can occur
-			String instStr = st.nextToken(); 
-			try {
-				Instruction tmpinst = CPInstructionParser.parseSingleInstruction(instStr);
-				tmpinst = saveReplaceThreadID(tmpinst, CP_ROOT_THREAD_ID, CP_CHILD_THREAD+id );
-				insts.add( tmpinst );
-			}
-			catch(Exception ex) {
-				throw new DMLRuntimeException("Failed to parse instruction: " + instStr, ex);
-			}
-		}
-		return insts;
-	}
-	
-	private static ArrayList<ResultVar> parseResultVariables(String in) {
-		ArrayList<ResultVar> ret = new ArrayList<>();
-		for(String var : parseStringArrayList(in)) {
-			boolean accum = var.endsWith("+");
-			ret.add(new ResultVar(accum ? var.substring(0, var.length()-1) : var, accum));
-		}
-		return ret;
-	}
-
-	private static HashMap<String,String> parseStringHashMap( String in ) {
-		HashMap<String,String> vars = new HashMap<>();
-		StringTokenizer st = new StringTokenizer(in,ELEMENT_DELIM);
-		while( st.hasMoreTokens() ) {
-			String lin = st.nextToken();
-			int index = lin.indexOf( KEY_VALUE_DELIM );
-			String tmp1 = lin.substring(0, index);
-			String tmp2 = lin.substring(index + 1);
-			vars.put(tmp1, tmp2);
-		}
-		return vars;
-	}
-
-	private static ArrayList<String> parseStringArrayList( String in )
-	{
-		ArrayList<String> vars = new ArrayList<>();
-		StringTokenizer st = new StringTokenizer(in,ELEMENT_DELIM);
-		while( st.hasMoreTokens() ) {
-			String tmp = st.nextToken();
-			vars.add(tmp);
-		}
-		
-		return vars;
-	}
-
-	private static ArrayList<DataIdentifier> parseDataIdentifiers( String in )
-	{
-		ArrayList<DataIdentifier> vars = new ArrayList<>();
-		StringTokenizer st = new StringTokenizer(in, ELEMENT_DELIM);
-		while( st.hasMoreTokens() ) {
-			String tmp = st.nextToken();
-			DataIdentifier dat = parseDataIdentifier( tmp );
-			vars.add(dat);
-		}
-
-		return vars;
-	}
-
-	private static DataIdentifier parseDataIdentifier( String in )
-	{
-		StringTokenizer st = new StringTokenizer(in, DATA_FIELD_DELIM);
-		String name = st.nextToken();
-		DataType dt = DataType.valueOf(st.nextToken());
-		ValueType vt = ValueType.valueOf(st.nextToken());
-		
-		DataIdentifier dat = new DataIdentifier(name);
-		dat.setDataType(dt);
-		dat.setValueType(vt);
-		
-		return dat;
-	}
-	
-	/**
-	 * NOTE: MRJobConfiguration cannot be used for the general case because program blocks and
-	 * related symbol tables can be hierarchically structured.
-	 * 
-	 * @param in data object as string
-	 * @return array of objects
-	 */
-	public static Object[] parseDataObject(String in) {
-		Object[] ret = new Object[2];
-	
-		StringTokenizer st = new StringTokenizer(in, DATA_FIELD_DELIM );
-		String name = st.nextToken();
-		DataType datatype = DataType.valueOf( st.nextToken() );
-		ValueType valuetype = ValueType.valueOf( st.nextToken() );
-		String valString = st.hasMoreTokens() ? st.nextToken() : "";
-		Data dat = null;
-		switch( datatype )
-		{
-			case SCALAR:
-			{
-				switch ( valuetype )
-				{
-					case INT:
-						dat = new IntObject(Long.parseLong(valString));
-						break;
-					case DOUBLE:
-						dat = new DoubleObject(Double.parseDouble(valString));
-						break;
-					case BOOLEAN:
-						dat = new BooleanObject(Boolean.parseBoolean(valString));
-						break;
-					case STRING:
-						dat = new StringObject(valString);
-						break;
-					default:
-						throw new DMLRuntimeException("Unable to parse valuetype "+valuetype);
-				}
-				break;
-		    }
-			case MATRIX:
-			{
-				MatrixObject mo = new MatrixObject(valuetype,valString);
-				long rows = Long.parseLong( st.nextToken() );
-				long cols = Long.parseLong( st.nextToken() );
-				int brows = Integer.parseInt( st.nextToken() );
-				int bcols = Integer.parseInt( st.nextToken() );
-				long nnz = Long.parseLong( st.nextToken() );
-				InputInfo iin = InputInfo.stringToInputInfo( st.nextToken() );
-				OutputInfo oin = OutputInfo.stringToOutputInfo( st.nextToken() );
-				PartitionFormat partFormat = PartitionFormat.valueOf( st.nextToken() );
-				UpdateType inplace = UpdateType.valueOf( st.nextToken() );
-				MatrixCharacteristics mc = new MatrixCharacteristics(rows, cols, brows, bcols, nnz); 
-				MetaDataFormat md = new MetaDataFormat( mc, oin, iin );
-				mo.setMetaData( md );
-				if( partFormat._dpf != PDataPartitionFormat.NONE )
-					mo.setPartitioned( partFormat._dpf, partFormat._N );
-				mo.setUpdateType(inplace);
-				dat = mo;
-				break;
-			}
-			default:
-				throw new DMLRuntimeException("Unable to parse datatype "+datatype);
-		}
-		
-		ret[0] = name;
-		ret[1] = dat;
-		return ret;
-	}
-
-	private static ExecutionContext parseExecutionContext(String in, Program prog) 
-	{
-		ExecutionContext ec = null;
-		
-		String lin = in.substring(PARFOR_EC_BEGIN.length(),in.length()-PARFOR_EC_END.length()).trim(); 
-		
-		if( !lin.equals( EMPTY ) )
-		{
-			LocalVariableMap vars = parseVariables(lin);
-			ec = ExecutionContextFactory.createContext( false, prog );
-			ec.setVariables(vars);
-		}
-		
-		return ec;
-	}
-	
-	private static void parseAndSetAdditionalConfigurations(String conf) {
-		String[] statsFlag = conf.split("=");
-		DMLScript.STATISTICS = Boolean.parseBoolean(statsFlag[1]);
-	}
-
-	//////////
-	// CUSTOM SAFE LITERAL REPLACEMENT
-	
-	/**
-	 * In-place replacement of thread ids in filenames, functions names etc
-	 * 
-	 * @param inst instruction
-	 * @param pattern ?
-	 * @param replacement string replacement
-	 * @return instruction
-	 */
-	private static Instruction saveReplaceThreadID( Instruction inst, String pattern, String replacement ) 
-	{
-		//currently known, relevant instructions: createvar, rand, seq, extfunct, 
-		if( inst instanceof MRJobInstruction )
-		{
-			//update dims file, and internal string representations of rand/seq instructions
-			MRJobInstruction mrinst = (MRJobInstruction)inst;
-			mrinst.updateInstructionThreadID(pattern, replacement);
-		}
-		else if ( inst instanceof VariableCPInstruction )  //createvar, setfilename
-		{
-			//update in-memory representation
-			inst.updateInstructionThreadID(pattern, replacement);
-		}
-		//NOTE> //Rand, seq in CP not required
-		//else if( inst.toString().contains(pattern) )
-		//	throw new DMLRuntimeException( "DEBUG: Missed thread id replacement: "+inst );
-		
-		return inst;
-	}
-	
-	public static String saveReplaceFilenameThreadID(String fname, String pattern, String replace)
-	{
-		//save replace necessary in order to account for the possibility that read variables have our prefix in the absolute path
-		//replace the last match only, because (1) we have at most one _t0 and (2) always concatenated to the end.
-	    int pos = fname.lastIndexOf(pattern);
-	    if( pos < 0 )
-	    	return fname;
-	    return fname.substring(0, pos) + replace + fname.substring(pos+pattern.length());
-	}
-	
-	
-	//////////
-	// CUSTOM HIERARCHICAL TOKENIZER
-	
-	
-	/**
-	 * Custom StringTokenizer for splitting strings of hierarchies. The basic idea is to
-	 * search for delim-Strings on the same hierarchy level, while delims of lower hierarchy
-	 * levels are skipped.  
-	 * 
-	 */
-	private static class HierarchyAwareStringTokenizer //extends StringTokenizer
-	{
-		private String _str = null;
-		private String _del = null;
-		private int    _off = -1;
-		
-		public HierarchyAwareStringTokenizer( String in, String delim )
-		{
-			//super(in);
-			_str = in;
-			_del = delim;
-			_off = delim.length();
-		}
-
-		public boolean hasMoreTokens() 
-		{
-			return (_str.length() > 0);
-		}
-
-		public String nextToken() 
-		{
-			int nextDelim = determineNextSameLevelIndexOf(_str, _del);
-			String token = null;
-			if(nextDelim < 0) 
-			{
-				nextDelim = _str.length();
-				_off = 0;
-			}
-			token = _str.substring(0,nextDelim);
-			_str = _str.substring( nextDelim + _off );
-			return token;
-		}
-				
-		private static int determineNextSameLevelIndexOf( String data, String pattern  )
-		{
-			String tmpdata = data;
-			int index      = 0;
-			int count      = 0;
-			int off=0,i1,i2,i3,min;
-			
-			while(true)
-			{
-				i1 = tmpdata.indexOf(pattern);
-				i2 = tmpdata.indexOf(LEVELIN);
-				i3 = tmpdata.indexOf(LEVELOUT);
-				
-				if( i1 < 0 ) return i1; //no pattern found at all
-				
-				min = i1; //min >= 0 by definition
-				if( i2 >= 0 ) min = Math.min(min, i2);
-				if( i3 >= 0 ) min = Math.min(min, i3);
-				
-				//stack maintenance
-				if( i1 == min && count == 0 )
-					return index+i1;
-				else if( i2 == min )
-				{
-					count++;
-					off = LEVELIN.length();
-				}
-				else if( i3 == min )
-				{
-					count--;
-					off = LEVELOUT.length();
-				}
-			
-				//prune investigated string
-				index += min+off;
-				tmpdata = tmpdata.substring(min+off);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
index b51df84..1f25032 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
@@ -57,6 +57,7 @@ import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
 import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
 import org.apache.sysml.runtime.util.MapReduceTool;
+import org.apache.sysml.runtime.util.ProgramConverter;
 import org.apache.sysml.utils.Statistics;
 import org.apache.sysml.yarn.DMLAppMasterUtils;
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index 885b2b7..66b4283 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -37,14 +37,13 @@ import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.parfor.Task.TaskType;
 import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
 import org.apache.sysml.runtime.instructions.cp.IntObject;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
-import org.apache.sysml.runtime.util.LocalFileUtils;
+import org.apache.sysml.runtime.util.ProgramConverter;
 
 import scala.Tuple2;
 
@@ -151,23 +150,9 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
 		_numTasks    = 0;
 		_numIters    = 0;
 
-		//init and register-cleanup of buffer pool (in parfor spark, multiple tasks might 
-		//share the process-local, i.e., per executor, buffer pool; hence we synchronize 
-		//the initialization and immediately register the created directory for cleanup
-		//on process exit, i.e., executor exit, including any files created in the future.
-		synchronized( CacheableData.class ) {
-			if( !CacheableData.isCachingActive() && !InfrastructureAnalyzer.isLocalMode() ) { 
-				//create id, executor working dir, and cache dir
-				String uuid = IDHandler.createDistributedUniqueID();
-				LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
-				CacheableData.initCaching( uuid ); //incl activation and cache dir creation
-				CacheableData.cacheEvictionLocalFilePrefix = 
-						CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; 
-				//register entire working dir for delete on shutdown
-				RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown();
-			}
-		}
-		
+		//setup the buffer pool
+		RemoteParForUtils.setupBufferPool(_workerID);
+
 		//ensure that resultvar files are not removed
 		super.pinResultVariables();
 		

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
index 6086d25..3f235cc 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
@@ -48,6 +48,7 @@ import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
 import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
 import org.apache.sysml.runtime.util.LocalFileUtils;
+import org.apache.sysml.runtime.util.ProgramConverter;
 import org.apache.sysml.utils.Statistics;
 
 public class RemoteDPParWorkerReducer extends ParWorker

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
index baec5a2..81a5e65 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
@@ -54,6 +54,7 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
 import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
 import org.apache.sysml.runtime.util.MapReduceTool;
+import org.apache.sysml.runtime.util.ProgramConverter;
 import org.apache.sysml.utils.Statistics;
 import org.apache.sysml.yarn.DMLAppMasterUtils;
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
index ab29148..fd0b9eb 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
@@ -37,8 +37,7 @@ import org.apache.sysml.runtime.codegen.CodegenUtils;
 import org.apache.sysml.runtime.controlprogram.caching.CacheBlock;
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
-import org.apache.sysml.runtime.util.LocalFileUtils;
+import org.apache.sysml.runtime.util.ProgramConverter;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
 import scala.Tuple2;
@@ -128,23 +127,9 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 			.map(v -> v._name).collect(Collectors.toList()), _ec.getVarListPartitioned());
 		reuseVars.reuseVariables(_jobid, _ec.getVariables(), blacklist, _brInputs, _cleanCache);
 		
-		//init and register-cleanup of buffer pool (in parfor spark, multiple tasks might 
-		//share the process-local, i.e., per executor, buffer pool; hence we synchronize 
-		//the initialization and immediately register the created directory for cleanup
-		//on process exit, i.e., executor exit, including any files created in the future.
-		synchronized( CacheableData.class ) {
-			if( !CacheableData.isCachingActive() && !InfrastructureAnalyzer.isLocalMode() ) { 
-				//create id, executor working dir, and cache dir
-				String uuid = IDHandler.createDistributedUniqueID();
-				LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
-				CacheableData.initCaching( uuid ); //incl activation and cache dir creation
-				CacheableData.cacheEvictionLocalFilePrefix = 
-						CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; 
-				//register entire working dir for delete on shutdown
-				RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown();
-			}
-		}
-		
+		//setup the buffer pool
+		RemoteParForUtils.setupBufferPool(_workerID);
+
 		//ensure that resultvar files are not removed
 		super.pinResultVariables();
 		

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
index fb2adc2..2cf35c7 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
@@ -45,8 +45,10 @@ import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.controlprogram.parfor.stat.Stat;
+import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
 import org.apache.sysml.runtime.instructions.cp.Data;
 import org.apache.sysml.runtime.util.LocalFileUtils;
+import org.apache.sysml.runtime.util.ProgramConverter;
 import org.apache.sysml.utils.Statistics;
 
 /**
@@ -237,6 +239,29 @@ public class RemoteParForUtils
 		//create return array
 		return tmp.values().toArray(new LocalVariableMap[0]);	
 	}
+
+	/**
+	 * Init and register-cleanup of buffer pool
+	 * @param workerID worker id
+	 * @throws IOException exception
+	 */
+	public static void setupBufferPool(long workerID) throws IOException {
+		//init and register-cleanup of buffer pool (in spark, multiple tasks might
+		//share the process-local, i.e., per executor, buffer pool; hence we synchronize
+		//the initialization and immediately register the created directory for cleanup
+		//on process exit, i.e., executor exit, including any files created in the future.
+		synchronized(CacheableData.class) {
+			if (!CacheableData.isCachingActive() && !InfrastructureAnalyzer.isLocalMode()) {
+				//create id, executor working dir, and cache dir
+				String uuid = IDHandler.createDistributedUniqueID();
+				LocalFileUtils.createWorkingDirectoryWithUUID(uuid);
+				CacheableData.initCaching(uuid); //incl activation and cache dir creation
+				CacheableData.cacheEvictionLocalFilePrefix = CacheableData.cacheEvictionLocalFilePrefix + "_" + workerID;
+				//register entire working dir for delete on shutdown
+				RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown();
+			}
+		}
+	}
 	
 	/**
 	 * Task to be registered as shutdown hook in order to delete the 

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java
index 7029061..0bdb92e 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java
@@ -42,6 +42,7 @@ import org.apache.sysml.runtime.instructions.cp.Data;
 import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
 import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
 import org.apache.sysml.runtime.util.LocalFileUtils;
+import org.apache.sysml.runtime.util.ProgramConverter;
 import org.apache.sysml.utils.Statistics;
 
 /**

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index ed3b758..d345d01 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -81,7 +81,7 @@ import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
+import org.apache.sysml.runtime.util.ProgramConverter;
 import org.apache.sysml.runtime.controlprogram.parfor.ResultMergeLocalFile;
 import org.apache.sysml.runtime.controlprogram.parfor.opt.CostEstimator.ExcludeType;
 import org.apache.sysml.runtime.controlprogram.parfor.opt.CostEstimator.TestMeasure;
@@ -438,7 +438,7 @@ public class OptimizerRuleBased extends Optimizer
 	
 		_numEvaluatedPlans++;
 		LOG.debug(getOptMode()+" OPT: rewrite 'set data partitioner' - result="+pdp.toString()+
-			" ("+ProgramConverter.serializeStringCollection(partitionedMatrices.keySet())+")" );
+			" ("+Arrays.toString(partitionedMatrices.keySet().toArray())+")" );
 		
 		return blockwise;
 	}
@@ -1809,8 +1809,8 @@ public class OptimizerRuleBased extends Optimizer
 			pfpb.setRuntimePiggybacking(apply);
 		
 		_numEvaluatedPlans++;
-		LOG.debug(getOptMode()+" OPT: rewrite 'enable runtime piggybacking' - result="+apply+
-				" ("+ProgramConverter.serializeStringCollection(sharedVars)+")" );
+		LOG.debug(getOptMode()+" OPT: rewrite 'enable runtime piggybacking' - result="
+			+apply+" ("+Arrays.toString(sharedVars.toArray())+")" );
 	}
 
 	protected boolean rHasSharedMRInput( OptNode n, Set<String> inputVars, Set<String> partitionedVars, HashSet<String> sharedVars ) 
@@ -1931,8 +1931,8 @@ public class OptimizerRuleBased extends Optimizer
 		}
 		
 		_numEvaluatedPlans++;
-		LOG.debug(getOptMode()+" OPT: rewrite 'inject spark input repartition' - result="+ret.size()+
-				" ("+ProgramConverter.serializeStringCollection(ret)+")" );
+		LOG.debug(getOptMode()+" OPT: rewrite 'inject spark input repartition' - result="
+			+ret.size()+" ("+Arrays.toString(ret.toArray())+")" );
 	}
 
 	private void rCollectZipmmPartitioningCandidates( OptNode n, HashSet<String> cand )
@@ -2008,8 +2008,8 @@ public class OptimizerRuleBased extends Optimizer
 		}
 		
 		_numEvaluatedPlans++;
-		LOG.debug(getOptMode()+" OPT: rewrite 'set spark eager rdd caching' - result="+ret.size()+
-				" ("+ProgramConverter.serializeStringCollection(ret)+")" );
+		LOG.debug(getOptMode()+" OPT: rewrite 'set spark eager rdd caching' - result="
+			+ret.size()+" ("+Arrays.toString(ret.toArray())+")" );
 	}
 	
 	///////

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/instructions/MRJobInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/MRJobInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/MRJobInstruction.java
index 65e9d3a..9d6f133 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/MRJobInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/MRJobInstruction.java
@@ -36,7 +36,7 @@ import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartition
 import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
+import org.apache.sysml.runtime.util.ProgramConverter;
 import org.apache.sysml.runtime.instructions.cp.Data;
 import org.apache.sysml.runtime.matrix.JobReturn;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;


[4/4] systemml git commit: [SYSTEMML-2419] Paramserv spark function shipping and worker setup

Posted by mb...@apache.org.
[SYSTEMML-2419] Paramserv spark function shipping and worker setup

Closes #799.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/cffefca3
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/cffefca3
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/cffefca3

Branch: refs/heads/master
Commit: cffefca30e89ce249c3030d23123c1b3aba1757a
Parents: 614adec
Author: EdgarLGB <gu...@atos.net>
Authored: Sun Jul 15 22:01:54 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sun Jul 15 22:02:04 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/sysml/api/DMLScript.java    |    4 +-
 .../java/org/apache/sysml/hops/DataGenOp.java   |    5 +-
 .../org/apache/sysml/hops/OptimizerUtils.java   |    3 +-
 .../apache/sysml/hops/recompile/Recompiler.java |    5 +-
 src/main/java/org/apache/sysml/lops/Lop.java    |    2 +
 .../java/org/apache/sysml/lops/compile/Dag.java |    3 +-
 .../org/apache/sysml/parser/DMLTranslator.java  |    3 +-
 .../controlprogram/LocalVariableMap.java        |    2 +-
 .../controlprogram/ParForProgramBlock.java      |    2 +-
 .../context/ExecutionContext.java               |    4 +
 .../controlprogram/paramserv/PSWorker.java      |   30 +-
 .../paramserv/ParamservUtils.java               |   35 +-
 .../paramserv/spark/SparkPSBody.java            |   46 +
 .../paramserv/spark/SparkPSWorker.java          |   43 +-
 .../controlprogram/parfor/ProgramConverter.java | 1866 ------------------
 .../controlprogram/parfor/RemoteDPParForMR.java |    1 +
 .../parfor/RemoteDPParForSparkWorker.java       |   23 +-
 .../parfor/RemoteDPParWorkerReducer.java        |    1 +
 .../controlprogram/parfor/RemoteParForMR.java   |    1 +
 .../parfor/RemoteParForSparkWorker.java         |   23 +-
 .../parfor/RemoteParForUtils.java               |   25 +
 .../parfor/RemoteParWorkerMapper.java           |    1 +
 .../parfor/opt/OptimizerRuleBased.java          |   16 +-
 .../runtime/instructions/MRJobInstruction.java  |    2 +-
 .../cp/ParamservBuiltinCPInstruction.java       |   34 +-
 .../instructions/cp/VariableCPInstruction.java  |    2 +-
 .../sysml/runtime/util/ProgramConverter.java    | 1838 +++++++++++++++++
 .../paramserv/ParamservLocalNNTest.java         |   93 +
 .../functions/paramserv/ParamservNNTest.java    |   94 -
 .../paramserv/ParamservSparkNNTest.java         |   47 +
 .../functions/paramserv/SerializationTest.java  |   80 +
 .../parfor/ParForAdversarialLiteralsTest.java   |   13 +-
 .../paramserv/mnist_lenet_paramserv.dml         |    4 +-
 .../paramserv/paramserv-nn-asp-batch.dml        |    2 +-
 .../paramserv/paramserv-nn-asp-epoch.dml        |    2 +-
 .../paramserv/paramserv-nn-bsp-batch-dc.dml     |    2 +-
 .../paramserv/paramserv-nn-bsp-batch-dr.dml     |    2 +-
 .../paramserv/paramserv-nn-bsp-batch-drr.dml    |    2 +-
 .../paramserv/paramserv-nn-bsp-batch-or.dml     |    2 +-
 .../paramserv/paramserv-nn-bsp-epoch.dml        |    2 +-
 .../paramserv-spark-nn-bsp-batch-dc.dml         |   53 +
 .../functions/paramserv/ZPackageSuite.java      |    3 +-
 42 files changed, 2332 insertions(+), 2089 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/api/DMLScript.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/DMLScript.java b/src/main/java/org/apache/sysml/api/DMLScript.java
index 227c14b..9c8a3eb 100644
--- a/src/main/java/org/apache/sysml/api/DMLScript.java
+++ b/src/main/java/org/apache/sysml/api/DMLScript.java
@@ -28,6 +28,7 @@ import java.io.InputStreamReader;
 import java.net.URI;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
@@ -76,7 +77,6 @@ import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
 import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
 import org.apache.sysml.runtime.instructions.gpu.context.GPUContextPool;
@@ -853,7 +853,7 @@ public class DMLScript
 		
 		LOG.debug("SystemML security check: "
 				+ "local.user.name = " + userName + ", "
-				+ "local.user.groups = " + ProgramConverter.serializeStringCollection(groupNames) + ", "
+				+ "local.user.groups = " + Arrays.toString(groupNames.toArray()) + ", "
 				+ MRConfigurationNames.MR_JOBTRACKER_ADDRESS + " = " + job.get(MRConfigurationNames.MR_JOBTRACKER_ADDRESS) + ", "
 				+ MRConfigurationNames.MR_TASKTRACKER_TASKCONTROLLER + " = " + taskController + ","
 				+ MRConfigurationNames.MR_TASKTRACKER_GROUP + " = " + ttGroupName + ", "

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/hops/DataGenOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/DataGenOp.java b/src/main/java/org/apache/sysml/hops/DataGenOp.java
index 9e59235..b374185 100644
--- a/src/main/java/org/apache/sysml/hops/DataGenOp.java
+++ b/src/main/java/org/apache/sysml/hops/DataGenOp.java
@@ -34,7 +34,6 @@ import org.apache.sysml.parser.DataExpression;
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.parser.Statement;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
 /**
@@ -108,8 +107,8 @@ public class DataGenOp extends MultiThreadedHop
 		
 		//generate base dir
 		String scratch = ConfigurationManager.getScratchSpace();
-		_baseDir = scratch + Lop.FILE_SEPARATOR + Lop.PROCESS_PREFIX + DMLScript.getUUID() + Lop.FILE_SEPARATOR + 
-	               Lop.FILE_SEPARATOR + ProgramConverter.CP_ROOT_THREAD_ID + Lop.FILE_SEPARATOR;
+		_baseDir = scratch + Lop.FILE_SEPARATOR + Lop.PROCESS_PREFIX + DMLScript.getUUID() + Lop.FILE_SEPARATOR 
+			+ Lop.FILE_SEPARATOR + Lop.CP_ROOT_THREAD_ID + Lop.FILE_SEPARATOR;
 		
 		//compute unknown dims and nnz
 		refreshSizeInformation();

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
index 3f1a8cf..fb83df0 100644
--- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
@@ -46,7 +46,6 @@ import org.apache.sysml.runtime.controlprogram.ForProgramBlock;
 import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
 import org.apache.sysml.runtime.controlprogram.caching.LazyWriteBuffer;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
 import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.functionobjects.IntegerDivide;
 import org.apache.sysml.runtime.functionobjects.Modulus;
@@ -927,7 +926,7 @@ public class OptimizerUtils
 	public static String getUniqueTempFileName() {
 		return ConfigurationManager.getScratchSpace()
 			+ Lop.FILE_SEPARATOR + Lop.PROCESS_PREFIX + DMLScript.getUUID()
-			+ Lop.FILE_SEPARATOR + ProgramConverter.CP_ROOT_THREAD_ID + Lop.FILE_SEPARATOR 
+			+ Lop.FILE_SEPARATOR + Lop.CP_ROOT_THREAD_ID + Lop.FILE_SEPARATOR 
 			+ Dag.getNextUniqueFilenameSuffix();
 	}
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
index d5b0043..4175eac 100644
--- a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
+++ b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
@@ -82,7 +82,7 @@ import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
+import org.apache.sysml.runtime.util.ProgramConverter;
 import org.apache.sysml.runtime.controlprogram.parfor.opt.OptTreeConverter;
 import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.instructions.Instruction;
@@ -589,8 +589,7 @@ public class Recompiler
 		//update function names
 		if( hop instanceof FunctionOp && ((FunctionOp)hop).getFunctionType() != FunctionType.MULTIRETURN_BUILTIN) {
 			FunctionOp fop = (FunctionOp) hop;
-			fop.setFunctionName( fop.getFunctionName() +
-					             ProgramConverter.CP_CHILD_THREAD + pid);
+			fop.setFunctionName( fop.getFunctionName() + Lop.CP_CHILD_THREAD + pid);
 		}
 		
 		if( hop.getInput() != null )

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/lops/Lop.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/Lop.java b/src/main/java/org/apache/sysml/lops/Lop.java
index ff2d515..9e81496 100644
--- a/src/main/java/org/apache/sysml/lops/Lop.java
+++ b/src/main/java/org/apache/sysml/lops/Lop.java
@@ -77,6 +77,8 @@ public abstract class Lop
 	
 	public static final String FILE_SEPARATOR = "/";
 	public static final String PROCESS_PREFIX = "_p";
+	public static final String CP_ROOT_THREAD_ID = "_t0";
+	public static final String CP_CHILD_THREAD = "_t";
 	
 	//special delimiters w/ extended ASCII characters to avoid collisions 
 	public static final String INSTRUCTION_DELIMITOR = "\u2021";

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/lops/compile/Dag.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/compile/Dag.java b/src/main/java/org/apache/sysml/lops/compile/Dag.java
index b1d6865..452f030 100644
--- a/src/main/java/org/apache/sysml/lops/compile/Dag.java
+++ b/src/main/java/org/apache/sysml/lops/compile/Dag.java
@@ -69,7 +69,6 @@ import org.apache.sysml.parser.Expression;
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.StatementBlock;
 import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
 import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence;
 import org.apache.sysml.runtime.instructions.CPInstructionParser;
 import org.apache.sysml.runtime.instructions.Instruction;
@@ -198,7 +197,7 @@ public class Dag<N extends Lop>
 			scratchFilePath = scratch + Lop.FILE_SEPARATOR
 				+ Lop.PROCESS_PREFIX + DMLScript.getUUID()
 				+ Lop.FILE_SEPARATOR + Lop.FILE_SEPARATOR
-				+ ProgramConverter.CP_ROOT_THREAD_ID + Lop.FILE_SEPARATOR;
+				+ Lop.CP_ROOT_THREAD_ID + Lop.FILE_SEPARATOR;
 		}
 		return scratchFilePath;
 	}

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/parser/DMLTranslator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/DMLTranslator.java b/src/main/java/org/apache/sysml/parser/DMLTranslator.java
index 089edce..b9e5f9d 100644
--- a/src/main/java/org/apache/sysml/parser/DMLTranslator.java
+++ b/src/main/java/org/apache/sysml/parser/DMLTranslator.java
@@ -88,7 +88,6 @@ import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
 import org.apache.sysml.runtime.controlprogram.Program;
 import org.apache.sysml.runtime.controlprogram.ProgramBlock;
 import org.apache.sysml.runtime.controlprogram.WhileProgramBlock;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
 import org.apache.sysml.runtime.instructions.Instruction;
 
 
@@ -613,7 +612,7 @@ public class DMLTranslator
 				buff.append(Lop.PROCESS_PREFIX);
 				buff.append(DMLScript.getUUID());
 				buff.append(Lop.FILE_SEPARATOR);
-				buff.append(ProgramConverter.CP_ROOT_THREAD_ID);
+				buff.append(Lop.CP_ROOT_THREAD_ID);
 				buff.append(Lop.FILE_SEPARATOR);
 				buff.append("PackageSupport");
 				buff.append(Lop.FILE_SEPARATOR);

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java b/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
index 2081aae..62ae3d0 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
@@ -28,7 +28,7 @@ import java.util.StringTokenizer;
 
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
+import org.apache.sysml.runtime.util.ProgramConverter;
 import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence;
 import org.apache.sysml.runtime.instructions.cp.Data;
 import org.apache.sysml.utils.Statistics;

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index 83e9cde..ba490f3 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -65,7 +65,7 @@ import org.apache.sysml.runtime.controlprogram.parfor.DataPartitionerRemoteSpark
 import org.apache.sysml.runtime.controlprogram.parfor.LocalParWorker;
 import org.apache.sysml.runtime.controlprogram.parfor.LocalTaskQueue;
 import org.apache.sysml.runtime.controlprogram.parfor.ParForBody;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
+import org.apache.sysml.runtime.util.ProgramConverter;
 import org.apache.sysml.runtime.controlprogram.parfor.RemoteDPParForMR;
 import org.apache.sysml.runtime.controlprogram.parfor.RemoteDPParForSpark;
 import org.apache.sysml.runtime.controlprogram.parfor.RemoteParForJobReturn;

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
index d0d0b08..2e0addf 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
@@ -101,6 +101,10 @@ public class ExecutionContext {
 	public Program getProgram(){
 		return _prog;
 	}
+
+	public void setProgram(Program prog) {
+		_prog = prog;
+	}
 	
 	public LocalVariableMap getVariables() {
 		return _variables;

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/PSWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/PSWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/PSWorker.java
index a76dfec..1ab5f5e 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/PSWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/PSWorker.java
@@ -34,23 +34,25 @@ import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.cp.FunctionCallCPInstruction;
 
-@SuppressWarnings("unused")
 public abstract class PSWorker {
-
-	protected final int _workerID;
-	protected final int _epochs;
-	protected final long _batchSize;
-	protected final ExecutionContext _ec;
-	protected final ParamServer _ps;
-	protected final DataIdentifier _output;
-	protected final FunctionCallCPInstruction _inst;
+	protected int _workerID;
+	protected int _epochs;
+	protected long _batchSize;
+	protected ExecutionContext _ec;
+	protected ParamServer _ps;
+	protected DataIdentifier _output;
+	protected FunctionCallCPInstruction _inst;
 	protected MatrixObject _features;
 	protected MatrixObject _labels;
-	
-	private MatrixObject _valFeatures;
-	private MatrixObject _valLabels;
-	private final String _updFunc;
-	protected final Statement.PSFrequency _freq;
+
+	protected MatrixObject _valFeatures;
+	protected MatrixObject _valLabels;
+	protected String _updFunc;
+	protected Statement.PSFrequency _freq;
+
+	protected PSWorker() {
+
+	}
 	
 	protected PSWorker(int workerID, String updFunc, Statement.PSFrequency freq, int epochs, long batchSize,
 		MatrixObject valFeatures, MatrixObject valLabels, ExecutionContext ec, ParamServer ps) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java
index ec6b6b2..ee15709 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java
@@ -57,7 +57,6 @@ import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.controlprogram.paramserv.spark.DataPartitionerSparkAggregator;
 import org.apache.sysml.runtime.controlprogram.paramserv.spark.DataPartitionerSparkMapper;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
 import org.apache.sysml.runtime.functionobjects.Plus;
 import org.apache.sysml.runtime.instructions.cp.Data;
 import org.apache.sysml.runtime.instructions.cp.ListObject;
@@ -68,6 +67,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.runtime.matrix.operators.BinaryOperator;
+import org.apache.sysml.runtime.util.ProgramConverter;
 
 import scala.Tuple2;
 
@@ -175,9 +175,8 @@ public class ParamservUtils {
 			new String[]{ns, name} : new String[]{ns, name};
 	}
 
-	public static List<ExecutionContext> createExecutionContexts(ExecutionContext ec, LocalVariableMap varsMap,
-		String updFunc, String aggFunc, int workerNum, int k) {
-
+	public static ExecutionContext createExecutionContext(ExecutionContext ec, LocalVariableMap varsMap, String updFunc,
+		String aggFunc, int k) {
 		FunctionProgramBlock updPB = getFunctionBlock(ec, updFunc);
 		FunctionProgramBlock aggPB = getFunctionBlock(ec, aggFunc);
 
@@ -188,27 +187,21 @@ public class ParamservUtils {
 		// 2. Recompile the imported function blocks
 		prog.getFunctionProgramBlocks().forEach((fname, fvalue) -> recompileProgramBlocks(k, fvalue.getChildBlocks()));
 
-		// 3. Copy function for workers
-		List<ExecutionContext> workerECs = IntStream.range(0, workerNum)
-			.mapToObj(i -> {
-				FunctionProgramBlock newUpdFunc = copyFunction(updFunc, updPB);
-				FunctionProgramBlock newAggFunc = copyFunction(aggFunc, aggPB);
-				Program newProg = new Program();
-				putFunction(newProg, newUpdFunc);
-				putFunction(newProg, newAggFunc);
-				return ExecutionContextFactory.createContext(new LocalVariableMap(varsMap), newProg);
-			})
-			.collect(Collectors.toList());
-
-		// 4. Copy function for agg service
+		// 3. Copy function
+		FunctionProgramBlock newUpdFunc = copyFunction(updFunc, updPB);
 		FunctionProgramBlock newAggFunc = copyFunction(aggFunc, aggPB);
 		Program newProg = new Program();
+		putFunction(newProg, newUpdFunc);
 		putFunction(newProg, newAggFunc);
-		ExecutionContext aggEC = ExecutionContextFactory.createContext(new LocalVariableMap(varsMap), newProg);
+		return ExecutionContextFactory.createContext(new LocalVariableMap(varsMap), newProg);
+	}
 
-		List<ExecutionContext> result = new ArrayList<>(workerECs);
-		result.add(aggEC);
-		return result;
+	public static List<ExecutionContext> copyExecutionContext(ExecutionContext ec, int num) {
+		return IntStream.range(0, num).mapToObj(i -> {
+			Program newProg = new Program();
+			ec.getProgram().getFunctionProgramBlocks().forEach((func, pb) -> putFunction(newProg, copyFunction(func, pb)));
+			return ExecutionContextFactory.createContext(new LocalVariableMap(ec.getVariables()), newProg);
+		}).collect(Collectors.toList());
 	}
 
 	private static FunctionProgramBlock copyFunction(String funcName, FunctionProgramBlock fpb) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSBody.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSBody.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSBody.java
new file mode 100644
index 0000000..ec10232
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSBody.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.paramserv.spark;
+
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
+
+/**
+ * Wrapper class containing all needed for launching spark remote worker
+ */
+public class SparkPSBody {
+
+	private ExecutionContext _ec;
+
+	public SparkPSBody() {
+
+	}
+
+	public SparkPSBody(ExecutionContext ec) {
+		this._ec = ec;
+	}
+
+	public ExecutionContext getEc() {
+		return _ec;
+	}
+
+	public void setEc(ExecutionContext ec) {
+		this._ec = ec;
+	}
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSWorker.java
index 69da56c..466801f 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSWorker.java
@@ -19,30 +19,59 @@
 
 package org.apache.sysml.runtime.controlprogram.paramserv.spark;
 
+import java.io.IOException;
 import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.spark.api.java.function.VoidFunction;
 import org.apache.sysml.parser.Statement;
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysml.runtime.controlprogram.paramserv.ParamServer;
+import org.apache.sysml.runtime.codegen.CodegenUtils;
+import org.apache.sysml.runtime.controlprogram.paramserv.PSWorker;
+import org.apache.sysml.runtime.controlprogram.parfor.RemoteParForUtils;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.ProgramConverter;
 
 import scala.Tuple2;
 
-public class SparkPSWorker implements VoidFunction<Tuple2<Integer, Tuple2<MatrixBlock, MatrixBlock>>>, Serializable {
+public class SparkPSWorker extends PSWorker implements VoidFunction<Tuple2<Integer, Tuple2<MatrixBlock, MatrixBlock>>>, Serializable {
 
 	private static final long serialVersionUID = -8674739573419648732L;
 
-	public SparkPSWorker() {
+	private String _program;
+	private HashMap<String, byte[]> _clsMap;
+
+	protected SparkPSWorker() {
 		// No-args constructor used for deserialization
 	}
 
-	public SparkPSWorker(String updFunc, Statement.PSFrequency freq, int epochs, long batchSize,
-			MatrixObject valFeatures, MatrixObject valLabels, ExecutionContext ec, ParamServer ps) {
+	public SparkPSWorker(String updFunc, Statement.PSFrequency freq, int epochs, long batchSize, String program, HashMap<String, byte[]> clsMap) {
+		_updFunc = updFunc;
+		_freq = freq;
+		_epochs = epochs;
+		_batchSize = batchSize;
+		_program = program;
+		_clsMap = clsMap;
 	}
 
 	@Override
 	public void call(Tuple2<Integer, Tuple2<MatrixBlock, MatrixBlock>> input) throws Exception {
+		configureWorker(input);
+	}
+
+	private void configureWorker(Tuple2<Integer, Tuple2<MatrixBlock, MatrixBlock>> input) throws IOException {
+		_workerID = input._1;
+
+		// Initialize codegen class cache (before program parsing)
+		for (Map.Entry<String, byte[]> e : _clsMap.entrySet()) {
+			CodegenUtils.getClassSync(e.getKey(), e.getValue());
+		}
+
+		// Deserialize the body to initialize the execution context
+		SparkPSBody body = ProgramConverter.parseSparkPSBody(_program, _workerID);
+		_ec = body.getEc();
+
+		// Initialize the buffer pool and register it in the jvm shutdown hook in order to be cleanuped at the end
+		RemoteParForUtils.setupBufferPool(_workerID);
 	}
 }