You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/07/16 05:04:48 UTC
[1/4] systemml git commit: [SYSTEMML-2419] Paramserv spark function
shipping and worker setup
Repository: systemml
Updated Branches:
refs/heads/master 614adecaf -> cffefca30
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservSparkNNTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservSparkNNTest.java b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservSparkNNTest.java
new file mode 100644
index 0000000..2441116
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservSparkNNTest.java
@@ -0,0 +1,47 @@
+package org.apache.sysml.test.integration.functions.paramserv;
+
+import org.apache.sysml.api.DMLException;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.junit.Test;
+
+public class ParamservSparkNNTest extends AutomatedTestBase {
+
+ private static final String TEST_NAME1 = "paramserv-spark-nn-bsp-batch-dc";
+
+ private static final String TEST_DIR = "functions/paramserv/";
+ private static final String TEST_CLASS_DIR = TEST_DIR + ParamservSparkNNTest.class.getSimpleName() + "/";
+
+ @Override
+ public void setUp() {
+ addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {}));
+ }
+
+ @Test
+ public void testParamservBSPBatchDisjointContiguous() {
+ runDMLTest(TEST_NAME1);
+ }
+
+ private void runDMLTest(String testname) {
+ DMLScript.RUNTIME_PLATFORM oldRtplatform = AutomatedTestBase.rtplatform;
+ boolean oldUseLocalSparkConfig = DMLScript.USE_LOCAL_SPARK_CONFIG;
+ AutomatedTestBase.rtplatform = DMLScript.RUNTIME_PLATFORM.SPARK;
+ DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
+ try {
+ TestConfiguration config = getTestConfiguration(testname);
+ loadTestConfiguration(config);
+ programArgs = new String[] { "-explain" };
+ String HOME = SCRIPT_DIR + TEST_DIR;
+ fullDMLScriptName = HOME + testname + ".dml";
+ // The test is not already finished, so it is normal to have the NPE
+ runTest(true, true, DMLException.class, null, -1);
+ } finally {
+ AutomatedTestBase.rtplatform = oldRtplatform;
+ DMLScript.USE_LOCAL_SPARK_CONFIG = oldUseLocalSparkConfig;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/java/org/apache/sysml/test/integration/functions/paramserv/SerializationTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/paramserv/SerializationTest.java b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/SerializationTest.java
new file mode 100644
index 0000000..2a08ca6
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/SerializationTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.paramserv;
+
+import java.util.Arrays;
+
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils;
+import org.apache.sysml.runtime.instructions.cp.IntObject;
+import org.apache.sysml.runtime.instructions.cp.ListObject;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.runtime.util.ProgramConverter;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SerializationTest {
+
+ @Test
+ public void serializeUnnamedListObject() {
+ MatrixObject mo1 = generateDummyMatrix(10);
+ MatrixObject mo2 = generateDummyMatrix(20);
+ IntObject io = new IntObject(30);
+ ListObject lo = new ListObject(Arrays.asList(mo1, mo2, io));
+ String serial = ProgramConverter.serializeDataObject("key", lo);
+ Object[] obj = ProgramConverter.parseDataObject(serial);
+ ListObject actualLO = (ListObject) obj[1];
+ MatrixObject actualMO1 = (MatrixObject) actualLO.slice(0);
+ MatrixObject actualMO2 = (MatrixObject) actualLO.slice(1);
+ IntObject actualIO = (IntObject) actualLO.slice(2);
+ Assert.assertArrayEquals(mo1.acquireRead().getDenseBlockValues(), actualMO1.acquireRead().getDenseBlockValues(), 0);
+ Assert.assertArrayEquals(mo2.acquireRead().getDenseBlockValues(), actualMO2.acquireRead().getDenseBlockValues(), 0);
+ Assert.assertEquals(io.getLongValue(), actualIO.getLongValue());
+ }
+
+ @Test
+ public void serializeNamedListObject() {
+ MatrixObject mo1 = generateDummyMatrix(10);
+ MatrixObject mo2 = generateDummyMatrix(20);
+ IntObject io = new IntObject(30);
+ ListObject lo = new ListObject(Arrays.asList(mo1, mo2, io), Arrays.asList("e1", "e2", "e3"));
+
+ String serial = ProgramConverter.serializeDataObject("key", lo);
+ Object[] obj = ProgramConverter.parseDataObject(serial);
+ ListObject actualLO = (ListObject) obj[1];
+ MatrixObject actualMO1 = (MatrixObject) actualLO.slice(0);
+ MatrixObject actualMO2 = (MatrixObject) actualLO.slice(1);
+ IntObject actualIO = (IntObject) actualLO.slice(2);
+ Assert.assertEquals(lo.getNames(), actualLO.getNames());
+ Assert.assertArrayEquals(mo1.acquireRead().getDenseBlockValues(), actualMO1.acquireRead().getDenseBlockValues(), 0);
+ Assert.assertArrayEquals(mo2.acquireRead().getDenseBlockValues(), actualMO2.acquireRead().getDenseBlockValues(), 0);
+ Assert.assertEquals(io.getLongValue(), actualIO.getLongValue());
+ }
+
+ private MatrixObject generateDummyMatrix(int size) {
+ double[] dl = new double[size];
+ for (int i = 0; i < size; i++) {
+ dl[i] = i;
+ }
+ MatrixObject result = ParamservUtils.newMatrixObject(DataConverter.convertToMatrixBlock(dl, true));
+ result.exportData();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAdversarialLiteralsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAdversarialLiteralsTest.java b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAdversarialLiteralsTest.java
index a4eab06..40f5a1b 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAdversarialLiteralsTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAdversarialLiteralsTest.java
@@ -23,7 +23,7 @@ import java.util.HashMap;
import org.junit.Test;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
+import org.apache.sysml.lops.Lop;
import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
import org.apache.sysml.test.integration.AutomatedTestBase;
import org.apache.sysml.test.integration.TestConfiguration;
@@ -123,7 +123,7 @@ public class ParForAdversarialLiteralsTest extends AutomatedTestBase
// This is for running the junit test the new way, i.e., construct the arguments directly
String HOME = SCRIPT_DIR + TEST_DIR;
String IN = "A";
- String OUT = (testName.equals(TEST_NAME1a)||testName.equals(TEST_NAME1b))?ProgramConverter.CP_ROOT_THREAD_ID:"B";
+ String OUT = (testName.equals(TEST_NAME1a)||testName.equals(TEST_NAME1b))?Lop.CP_ROOT_THREAD_ID:"B";
fullDMLScriptName = HOME + TEST_NAME + ".dml";
programArgs = new String[]{"-args", input(IN),
@@ -132,7 +132,7 @@ public class ParForAdversarialLiteralsTest extends AutomatedTestBase
fullRScriptName = HOME + TEST_NAME + ".R";
rCmd = "Rscript" + " " + fullRScriptName + " " + inputDir() + " " + expectedDir();
- double[][] A = getRandomMatrix(rows, cols, 0, 1, sparsity, 7);
+ double[][] A = getRandomMatrix(rows, cols, 0, 1, sparsity, 7);
writeInputMatrix("A", A, false);
boolean exceptionExpected = false;
@@ -141,8 +141,7 @@ public class ParForAdversarialLiteralsTest extends AutomatedTestBase
//compare matrices
HashMap<CellIndex, Double> dmlin = TestUtils.readDMLMatrixFromHDFS(input(IN));
HashMap<CellIndex, Double> dmlout = readDMLMatrixFromHDFS(OUT);
-
- TestUtils.compareMatrices(dmlin, dmlout, eps, "DMLin", "DMLout");
+
+ TestUtils.compareMatrices(dmlin, dmlout, eps, "DMLin", "DMLout");
}
-
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/scripts/functions/paramserv/mnist_lenet_paramserv.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/paramserv/mnist_lenet_paramserv.dml b/src/test/scripts/functions/paramserv/mnist_lenet_paramserv.dml
index acafc88..84095ec 100644
--- a/src/test/scripts/functions/paramserv/mnist_lenet_paramserv.dml
+++ b/src/test/scripts/functions/paramserv/mnist_lenet_paramserv.dml
@@ -36,7 +36,7 @@ source("nn/optim/sgd_nesterov.dml") as sgd_nesterov
train = function(matrix[double] X, matrix[double] Y,
matrix[double] X_val, matrix[double] Y_val,
int C, int Hin, int Win, int epochs, int workers,
- string utype, string freq, int batchsize, string scheme)
+ string utype, string freq, int batchsize, string scheme, string mode)
return (matrix[double] W1, matrix[double] b1,
matrix[double] W2, matrix[double] b2,
matrix[double] W3, matrix[double] b3,
@@ -108,7 +108,7 @@ train = function(matrix[double] X, matrix[double] Y,
params = list(lr=lr, mu=mu, decay=decay, C=C, Hin=Hin, Win=Win, Hf=Hf, Wf=Wf, stride=stride, pad=pad, lambda=lambda, F1=F1, F2=F2, N3=N3)
# Use paramserv function
- modelList2 = paramserv(model=modelList, features=X, labels=Y, val_features=X_val, val_labels=Y_val, upd="./src/test/scripts/functions/paramserv/mnist_lenet_paramserv.dml::gradients", agg="./src/test/scripts/functions/paramserv/mnist_lenet_paramserv.dml::aggregation", mode="LOCAL", utype=utype, freq=freq, epochs=epochs, batchsize=batchsize, k=workers, scheme=scheme, hyperparams=params, checkpointing="NONE")
+ modelList2 = paramserv(model=modelList, features=X, labels=Y, val_features=X_val, val_labels=Y_val, upd="./src/test/scripts/functions/paramserv/mnist_lenet_paramserv.dml::gradients", agg="./src/test/scripts/functions/paramserv/mnist_lenet_paramserv.dml::aggregation", mode=mode, utype=utype, freq=freq, epochs=epochs, batchsize=batchsize, k=workers, scheme=scheme, hyperparams=params, checkpointing="NONE")
W1 = as.matrix(modelList2["W1"])
b1 = as.matrix(modelList2["b1"])
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/scripts/functions/paramserv/paramserv-nn-asp-batch.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/paramserv/paramserv-nn-asp-batch.dml b/src/test/scripts/functions/paramserv/paramserv-nn-asp-batch.dml
index 2279d58..ba22942 100644
--- a/src/test/scripts/functions/paramserv/paramserv-nn-asp-batch.dml
+++ b/src/test/scripts/functions/paramserv/paramserv-nn-asp-batch.dml
@@ -42,7 +42,7 @@ workers = 2
batchsize = 32
# Train
-[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "ASP", "BATCH", batchsize,"DISJOINT_CONTIGUOUS")
+[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "ASP", "BATCH", batchsize,"DISJOINT_CONTIGUOUS", "LOCAL")
# Compute validation loss & accuracy
probs_val = mnist_lenet::predict(X_val, C, Hin, Win, batchsize, W1, b1, W2, b2, W3, b3, W4, b4)
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/scripts/functions/paramserv/paramserv-nn-asp-epoch.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/paramserv/paramserv-nn-asp-epoch.dml b/src/test/scripts/functions/paramserv/paramserv-nn-asp-epoch.dml
index 1824083..c8c6a2f 100644
--- a/src/test/scripts/functions/paramserv/paramserv-nn-asp-epoch.dml
+++ b/src/test/scripts/functions/paramserv/paramserv-nn-asp-epoch.dml
@@ -42,7 +42,7 @@ workers = 2
batchsize = 32
# Train
-[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "ASP", "EPOCH", batchsize, "DISJOINT_CONTIGUOUS")
+[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "ASP", "EPOCH", batchsize, "DISJOINT_CONTIGUOUS", "LOCAL")
# Compute validation loss & accuracy
probs_val = mnist_lenet::predict(X_val, C, Hin, Win, batchsize, W1, b1, W2, b2, W3, b3, W4, b4)
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-dc.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-dc.dml b/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-dc.dml
index 2e09de4..78fc1c4 100644
--- a/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-dc.dml
+++ b/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-dc.dml
@@ -42,7 +42,7 @@ workers = 2
batchsize = 32
# Train
-[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "BSP", "BATCH", batchsize, "DISJOINT_CONTIGUOUS")
+[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "BSP", "BATCH", batchsize, "DISJOINT_CONTIGUOUS", "LOCAL")
# Compute validation loss & accuracy
probs_val = mnist_lenet::predict(X_val, C, Hin, Win, batchsize, W1, b1, W2, b2, W3, b3, W4, b4)
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-dr.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-dr.dml b/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-dr.dml
index 8444952..9191b5a 100644
--- a/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-dr.dml
+++ b/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-dr.dml
@@ -42,7 +42,7 @@ workers = 2
batchsize = 32
# Train
-[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "BSP", "BATCH", batchsize, "DISJOINT_RANDOM")
+[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "BSP", "BATCH", batchsize, "DISJOINT_RANDOM", "LOCAL")
# Compute validation loss & accuracy
probs_val = mnist_lenet::predict(X_val, C, Hin, Win, batchsize, W1, b1, W2, b2, W3, b3, W4, b4)
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-drr.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-drr.dml b/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-drr.dml
index ccb7ffc..ec18cb4 100644
--- a/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-drr.dml
+++ b/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-drr.dml
@@ -42,7 +42,7 @@ workers = 4
batchsize = 32
# Train
-[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "BSP", "BATCH", batchsize, "DISJOINT_ROUND_ROBIN")
+[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "BSP", "BATCH", batchsize, "DISJOINT_ROUND_ROBIN", "LOCAL")
# Compute validation loss & accuracy
probs_val = mnist_lenet::predict(X_val, C, Hin, Win, batchsize, W1, b1, W2, b2, W3, b3, W4, b4)
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-or.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-or.dml b/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-or.dml
index 4afc56b..928dde2 100644
--- a/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-or.dml
+++ b/src/test/scripts/functions/paramserv/paramserv-nn-bsp-batch-or.dml
@@ -42,7 +42,7 @@ workers = 2
batchsize = 32
# Train
-[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "BSP", "BATCH", batchsize, "OVERLAP_RESHUFFLE")
+[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "BSP", "BATCH", batchsize, "OVERLAP_RESHUFFLE", "LOCAL")
# Compute validation loss & accuracy
probs_val = mnist_lenet::predict(X_val, C, Hin, Win, batchsize, W1, b1, W2, b2, W3, b3, W4, b4)
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/scripts/functions/paramserv/paramserv-nn-bsp-epoch.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/paramserv/paramserv-nn-bsp-epoch.dml b/src/test/scripts/functions/paramserv/paramserv-nn-bsp-epoch.dml
index c542286..8605984 100644
--- a/src/test/scripts/functions/paramserv/paramserv-nn-bsp-epoch.dml
+++ b/src/test/scripts/functions/paramserv/paramserv-nn-bsp-epoch.dml
@@ -42,7 +42,7 @@ workers = 2
batchsize = 32
# Train
-[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "BSP", "EPOCH", batchsize,"DISJOINT_CONTIGUOUS")
+[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "BSP", "EPOCH", batchsize,"DISJOINT_CONTIGUOUS", "LOCAL")
# Compute validation loss & accuracy
probs_val = mnist_lenet::predict(X_val, C, Hin, Win, batchsize, W1, b1, W2, b2, W3, b3, W4, b4)
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/scripts/functions/paramserv/paramserv-spark-nn-bsp-batch-dc.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/paramserv/paramserv-spark-nn-bsp-batch-dc.dml b/src/test/scripts/functions/paramserv/paramserv-spark-nn-bsp-batch-dc.dml
new file mode 100644
index 0000000..31d44aa
--- /dev/null
+++ b/src/test/scripts/functions/paramserv/paramserv-spark-nn-bsp-batch-dc.dml
@@ -0,0 +1,53 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+source("src/test/scripts/functions/paramserv/mnist_lenet_paramserv.dml") as mnist_lenet
+source("nn/layers/cross_entropy_loss.dml") as cross_entropy_loss
+
+# Generate the training data
+[images, labels, C, Hin, Win] = mnist_lenet::generate_dummy_data()
+n = nrow(images)
+
+# Generate the training data
+[X, Y, C, Hin, Win] = mnist_lenet::generate_dummy_data()
+
+# Split into training and validation
+val_size = n * 0.1
+X = images[(val_size+1):n,]
+X_val = images[1:val_size,]
+Y = labels[(val_size+1):n,]
+Y_val = labels[1:val_size,]
+
+# Arguments
+epochs = 10
+workers = 2
+batchsize = 16
+
+# Train
+[W1, b1, W2, b2, W3, b3, W4, b4] = mnist_lenet::train(X, Y, X_val, Y_val, C, Hin, Win, epochs, workers, "BSP", "BATCH", batchsize, "DISJOINT_CONTIGUOUS", "REMOTE_SPARK")
+
+# Compute validation loss & accuracy
+probs_val = mnist_lenet::predict(X_val, C, Hin, Win, batchsize, W1, b1, W2, b2, W3, b3, W4, b4)
+loss_val = cross_entropy_loss::forward(probs_val, Y_val)
+accuracy_val = mean(rowIndexMax(probs_val) == rowIndexMax(Y_val))
+
+# Output results
+print("Val Loss: " + loss_val + ", Val Accuracy: " + accuracy_val)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test_suites/java/org/apache/sysml/test/integration/functions/paramserv/ZPackageSuite.java
----------------------------------------------------------------------
diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/paramserv/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/paramserv/ZPackageSuite.java
index 26ea638..d1b3a6d 100644
--- a/src/test_suites/java/org/apache/sysml/test/integration/functions/paramserv/ZPackageSuite.java
+++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/paramserv/ZPackageSuite.java
@@ -29,9 +29,10 @@ import org.junit.runners.Suite;
LocalDataPartitionerTest.class,
SparkDataPartitionerTest.class,
ParamservSyntaxTest.class,
+ SerializationTest.class,
ParamservRecompilationTest.class,
ParamservRuntimeNegativeTest.class,
- ParamservNNTest.class
+ ParamservLocalNNTest.class
})
[2/4] systemml git commit: [SYSTEMML-2419] Paramserv spark function
shipping and worker setup
Posted by mb...@apache.org.
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
index 3d61625..4e7a718 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
@@ -39,6 +39,8 @@ import static org.apache.sysml.parser.Statement.PS_UPDATE_TYPE;
import static org.apache.sysml.parser.Statement.PS_VAL_FEATURES;
import static org.apache.sysml.parser.Statement.PS_VAL_LABELS;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
@@ -54,6 +56,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.hops.recompile.Recompiler;
+import org.apache.sysml.lops.LopProperties;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
@@ -65,10 +69,12 @@ import org.apache.sysml.runtime.controlprogram.paramserv.LocalPSWorker;
import org.apache.sysml.runtime.controlprogram.paramserv.LocalParamServer;
import org.apache.sysml.runtime.controlprogram.paramserv.ParamServer;
import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils;
+import org.apache.sysml.runtime.controlprogram.paramserv.spark.SparkPSBody;
import org.apache.sysml.runtime.controlprogram.paramserv.spark.SparkPSWorker;
import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing;
import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.util.ProgramConverter;
import org.apache.sysml.utils.Statistics;
public class ParamservBuiltinCPInstruction extends ParameterizedBuiltinCPInstruction {
@@ -110,13 +116,30 @@ public class ParamservBuiltinCPInstruction extends ParameterizedBuiltinCPInstruc
private void runOnSpark(SparkExecutionContext sec, PSModeType mode) {
PSScheme scheme = getScheme();
int workerNum = getWorkerNum(mode);
+ String updFunc = getParam(PS_UPDATE_FUN);
+ String aggFunc = getParam(PS_AGGREGATION_FUN);
+
+ int k = getParLevel(workerNum);
+
+ // Get the compiled execution context
+ LocalVariableMap newVarsMap = createVarsMap(sec);
+ ExecutionContext newEC = ParamservUtils.createExecutionContext(sec, newVarsMap, updFunc, aggFunc, k);
MatrixObject features = sec.getMatrixObject(getParam(PS_FEATURES));
MatrixObject labels = sec.getMatrixObject(getParam(PS_LABELS));
- SparkPSWorker worker = new SparkPSWorker();
+ // Force all the instructions to CP type
+ Recompiler.recompileProgramBlockHierarchy2Forced(
+ newEC.getProgram().getProgramBlocks(), 0, new HashSet<>(), LopProperties.ExecType.CP);
+
+ // Serialize all the needed params for remote workers
+ SparkPSBody body = new SparkPSBody(newEC);
+ HashMap<String, byte[]> clsMap = new HashMap<>();
+ String program = ProgramConverter.serializeSparkPSBody(body, clsMap);
+
+ SparkPSWorker worker = new SparkPSWorker(getParam(PS_UPDATE_FUN), getFrequency(), getEpochs(), getBatchSize(), program, clsMap);
ParamservUtils.doPartitionOnSpark(sec, features, labels, scheme, workerNum) // Do data partitioning
- .foreach(worker); // Run remote workers
+ .foreach(worker); // Run remote workers
}
@@ -132,15 +155,14 @@ public class ParamservBuiltinCPInstruction extends ParameterizedBuiltinCPInstruc
int k = getParLevel(workerNum);
// Get the compiled execution context
- // Create workers' execution context
LocalVariableMap newVarsMap = createVarsMap(ec);
- List<ExecutionContext> newECs = ParamservUtils.createExecutionContexts(ec, newVarsMap, updFunc, aggFunc, workerNum, k);
+ ExecutionContext newEC = ParamservUtils.createExecutionContext(ec, newVarsMap, updFunc, aggFunc, k);
// Create workers' execution context
- List<ExecutionContext> workerECs = newECs.subList(0, newECs.size() - 1);
+ List<ExecutionContext> workerECs = ParamservUtils.copyExecutionContext(newEC, workerNum);
// Create the agg service's execution context
- ExecutionContext aggServiceEC = newECs.get(newECs.size() - 1);
+ ExecutionContext aggServiceEC = ParamservUtils.copyExecutionContext(newEC, 1).get(0);
PSFrequency freq = getFrequency();
PSUpdateType updateType = getUpdateType();
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
index 7bf941d..a1c89a3 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
@@ -40,7 +40,7 @@ import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
+import org.apache.sysml.runtime.util.ProgramConverter;
import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence;
import org.apache.sysml.runtime.instructions.Instruction;
import org.apache.sysml.runtime.instructions.InstructionUtils;
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/util/ProgramConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/ProgramConverter.java b/src/main/java/org/apache/sysml/runtime/util/ProgramConverter.java
new file mode 100644
index 0000000..1d2115e
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/util/ProgramConverter.java
@@ -0,0 +1,1838 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.StringTokenizer;
+import java.util.stream.Collectors;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.conf.CompilerConfig.ConfigType;
+import org.apache.sysml.conf.CompilerConfig;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.conf.DMLConfig;
+import org.apache.sysml.hops.Hop;
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.hops.recompile.Recompiler;
+import org.apache.sysml.lops.Lop;
+import org.apache.sysml.parser.DMLProgram;
+import org.apache.sysml.parser.DataIdentifier;
+import org.apache.sysml.parser.ForStatementBlock;
+import org.apache.sysml.parser.IfStatementBlock;
+import org.apache.sysml.parser.ParForStatementBlock;
+import org.apache.sysml.parser.StatementBlock;
+import org.apache.sysml.parser.Expression.DataType;
+import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.parser.ParForStatementBlock.ResultVar;
+import org.apache.sysml.parser.WhileStatementBlock;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.codegen.CodegenUtils;
+import org.apache.sysml.runtime.controlprogram.ExternalFunctionProgramBlock;
+import org.apache.sysml.runtime.controlprogram.ExternalFunctionProgramBlockCP;
+import org.apache.sysml.runtime.controlprogram.ForProgramBlock;
+import org.apache.sysml.runtime.controlprogram.FunctionProgramBlock;
+import org.apache.sysml.runtime.controlprogram.IfProgramBlock;
+import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
+import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
+import org.apache.sysml.runtime.controlprogram.Program;
+import org.apache.sysml.runtime.controlprogram.ProgramBlock;
+import org.apache.sysml.runtime.controlprogram.WhileProgramBlock;
+import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
+import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PExecMode;
+import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
+import org.apache.sysml.runtime.controlprogram.paramserv.spark.SparkPSBody;
+import org.apache.sysml.runtime.controlprogram.parfor.ParForBody;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.instructions.CPInstructionParser;
+import org.apache.sysml.runtime.instructions.Instruction;
+import org.apache.sysml.runtime.instructions.InstructionParser;
+import org.apache.sysml.runtime.instructions.MRJobInstruction;
+import org.apache.sysml.runtime.instructions.cp.BooleanObject;
+import org.apache.sysml.runtime.instructions.cp.CPInstruction;
+import org.apache.sysml.runtime.instructions.cp.Data;
+import org.apache.sysml.runtime.instructions.cp.DoubleObject;
+import org.apache.sysml.runtime.instructions.cp.FunctionCallCPInstruction;
+import org.apache.sysml.runtime.instructions.cp.IntObject;
+import org.apache.sysml.runtime.instructions.cp.ListObject;
+import org.apache.sysml.runtime.instructions.cp.ScalarObject;
+import org.apache.sysml.runtime.instructions.cp.SpoofCPInstruction;
+import org.apache.sysml.runtime.instructions.cp.StringObject;
+import org.apache.sysml.runtime.instructions.cp.VariableCPInstruction;
+import org.apache.sysml.runtime.instructions.gpu.GPUInstruction;
+import org.apache.sysml.runtime.instructions.mr.MRInstruction;
+import org.apache.sysml.runtime.instructions.spark.SPInstruction;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.MetaDataFormat;
+import org.apache.sysml.runtime.matrix.data.InputInfo;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.udf.ExternalFunctionInvocationInstruction;
+
+/**
+ * Program converter functionalities for
+ * (1) creating deep copies of program blocks, instructions, function program blocks, and
+ * (2) serializing and parsing of programs, program blocks, functions program blocks.
+ *
+ */
+//TODO: rewrite class to instance-based invocation (grown gradually and now inappropriate design)
+public class ProgramConverter
+{
+ protected static final Log LOG = LogFactory.getLog(ProgramConverter.class.getName());
+
+ //use escaped unicodes for separators in order to prevent string conflict
+ public static final String NEWLINE = "\n"; //System.lineSeparator();
+ public static final String COMPONENTS_DELIM = "\u236e"; //semicolon w/ bar; ";";
+ public static final String ELEMENT_DELIM = "\u236a"; //comma w/ bar; ",";
+ public static final String ELEMENT_DELIM2 = ",";
+ public static final String DATA_FIELD_DELIM = "\u007c"; //"|";
+ public static final String KEY_VALUE_DELIM = "\u003d"; //"=";
+ public static final String LEVELIN = "\u23a8"; //variant of left curly bracket; "\u007b"; //"{";
+ public static final String LEVELOUT = "\u23ac"; //variant of right curly bracket; "\u007d"; //"}";
+ public static final String EMPTY = "null";
+ public static final String DASH = "-";
+ public static final String REF = "ref";
+ public static final String LIST_ELEMENT_DELIM = "\t";
+
+ public static final String CDATA_BEGIN = "<![CDATA[";
+ public static final String CDATA_END = " ]]>";
+
+ public static final String PROG_BEGIN = " PROG" + LEVELIN;
+ public static final String PROG_END = LEVELOUT;
+ public static final String VARS_BEGIN = "VARS: ";
+ public static final String VARS_END = "";
+ public static final String PBS_BEGIN = " PBS" + LEVELIN;
+ public static final String PBS_END = LEVELOUT;
+ public static final String INST_BEGIN = " INST: ";
+ public static final String INST_END = "";
+ public static final String EC_BEGIN = " EC: ";
+ public static final String EC_END = "";
+ public static final String PB_BEGIN = " PB" + LEVELIN;
+ public static final String PB_END = LEVELOUT;
+ public static final String PB_WHILE = " WHILE" + LEVELIN;
+ public static final String PB_FOR = " FOR" + LEVELIN;
+ public static final String PB_PARFOR = " PARFOR" + LEVELIN;
+ public static final String PB_IF = " IF" + LEVELIN;
+ public static final String PB_FC = " FC" + LEVELIN;
+ public static final String PB_EFC = " EFC" + LEVELIN;
+
+ public static final String CONF_STATS = "stats";
+
+ // Used for parfor
+ public static final String PARFORBODY_BEGIN = CDATA_BEGIN + "PARFORBODY" + LEVELIN;
+ public static final String PARFORBODY_END = LEVELOUT + CDATA_END;
+
+ // Used for paramserv builtin function
+ public static final String PSBODY_BEGIN = CDATA_BEGIN + "PSBODY" + LEVELIN;
+ public static final String PSBODY_END = LEVELOUT + CDATA_END;
+
+ //exception msgs
+ public static final String NOT_SUPPORTED_EXTERNALFUNCTION_PB = "Not supported: ExternalFunctionProgramBlock contains MR instructions. " +
+ "(ExternalFunctionPRogramBlockCP can be used)";
+ public static final String NOT_SUPPORTED_MR_INSTRUCTION = "Not supported: Instructions of type other than CP instructions";
+ public static final String NOT_SUPPORTED_MR_PARFOR = "Not supported: Nested ParFOR REMOTE_MR due to possible deadlocks." +
+ "(LOCAL can be used for innner ParFOR)";
+ public static final String NOT_SUPPORTED_PB = "Not supported: type of program block";
+
+ ////////////////////////////////
+ // CREATION of DEEP COPIES
+ ////////////////////////////////
+
+ /**
+ * Creates a deep copy of the given execution context.
+ * For rt_platform=Hadoop, execution context has a symbol table.
+ *
+ * @param ec execution context
+ * @return execution context
+ * @throws CloneNotSupportedException if CloneNotSupportedException occurs
+ */
+ public static ExecutionContext createDeepCopyExecutionContext(ExecutionContext ec)
+ throws CloneNotSupportedException
+ {
+ ExecutionContext cpec = ExecutionContextFactory.createContext(false, ec.getProgram());
+ cpec.setVariables((LocalVariableMap) ec.getVariables().clone());
+
+ //handle result variables with in-place update flag
+ //(each worker requires its own copy of the empty matrix object)
+ for( String var : cpec.getVariables().keySet() ) {
+ Data dat = cpec.getVariables().get(var);
+ if( dat instanceof MatrixObject && ((MatrixObject)dat).getUpdateType().isInPlace() ) {
+ MatrixObject mo = (MatrixObject)dat;
+ MatrixObject moNew = new MatrixObject(mo);
+ if( mo.getNnz() != 0 ){
+ // If output matrix is not empty (NNZ != 0), then local copy is created so that
+ // update in place operation can be applied.
+ MatrixBlock mbVar = mo.acquireRead();
+ moNew.acquireModify (new MatrixBlock(mbVar));
+ mo.release();
+ } else {
+ //create empty matrix block w/ dense representation (preferred for update in-place)
+ //Creating a dense matrix block is valid because empty block not allocated and transfer
+ // to sparse representation happens in left indexing in place operation.
+ moNew.acquireModify(new MatrixBlock((int)mo.getNumRows(), (int)mo.getNumColumns(), false));
+ }
+ moNew.release();
+ cpec.setVariable(var, moNew);
+ }
+ }
+
+ return cpec;
+ }
+
+ /**
+ * This recursively creates a deep copy of program blocks and transparently replaces filenames according to the
+ * specified parallel worker in order to avoid conflicts between parworkers. This happens recursively in order
+ * to support arbitrary control-flow constructs within a parfor.
+ *
+ * @param childBlocks child program blocks
+ * @param pid ?
+ * @param IDPrefix ?
+ * @param fnStack ?
+ * @param fnCreated ?
+ * @param plain if true, full deep copy without id replacement
+ * @param forceDeepCopy if true, force deep copy
+ * @return list of program blocks
+ */
+ public static ArrayList<ProgramBlock> rcreateDeepCopyProgramBlocks(ArrayList<ProgramBlock> childBlocks, long pid, int IDPrefix, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy)
+ {
+ ArrayList<ProgramBlock> tmp = new ArrayList<>();
+
+ for( ProgramBlock pb : childBlocks )
+ {
+ Program prog = pb.getProgram();
+ ProgramBlock tmpPB = null;
+
+ if( pb instanceof WhileProgramBlock ) {
+ tmpPB = createDeepCopyWhileProgramBlock((WhileProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy);
+ }
+ else if( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock) ) {
+ tmpPB = createDeepCopyForProgramBlock((ForProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy );
+ }
+ else if( pb instanceof ParForProgramBlock ) {
+ ParForProgramBlock pfpb = (ParForProgramBlock) pb;
+ if( ParForProgramBlock.ALLOW_NESTED_PARALLELISM )
+ tmpPB = createDeepCopyParForProgramBlock(pfpb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy);
+ else
+ tmpPB = createDeepCopyForProgramBlock((ForProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy);
+ }
+ else if( pb instanceof IfProgramBlock ) {
+ tmpPB = createDeepCopyIfProgramBlock((IfProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy);
+ }
+ else { //last-level program block
+ tmpPB = new ProgramBlock(prog); // general case use for most PBs
+
+ //for recompile in the master node JVM
+ tmpPB.setStatementBlock(createStatementBlockCopy(pb.getStatementBlock(), pid, plain, forceDeepCopy));
+ //tmpPB.setStatementBlock(pb.getStatementBlock());
+ tmpPB.setThreadID(pid);
+ }
+
+ //copy instructions
+ tmpPB.setInstructions( createDeepCopyInstructionSet(pb.getInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
+
+ //copy symbol table
+ //tmpPB.setVariables( pb.getVariables() ); //implicit cloning
+
+ tmp.add(tmpPB);
+ }
+
+ return tmp;
+ }
+
+ public static WhileProgramBlock createDeepCopyWhileProgramBlock(WhileProgramBlock wpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) {
+ ArrayList<Instruction> predinst = createDeepCopyInstructionSet(wpb.getPredicate(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true);
+ WhileProgramBlock tmpPB = new WhileProgramBlock(prog, predinst);
+ tmpPB.setStatementBlock( createWhileStatementBlockCopy((WhileStatementBlock) wpb.getStatementBlock(), pid, plain, forceDeepCopy) );
+ tmpPB.setThreadID(pid);
+ tmpPB.setExitInstructions2( createDeepCopyInstructionSet(wpb.getExitInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true));
+ tmpPB.setChildBlocks(rcreateDeepCopyProgramBlocks(wpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy));
+ return tmpPB;
+ }
+
+ public static IfProgramBlock createDeepCopyIfProgramBlock(IfProgramBlock ipb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) {
+ ArrayList<Instruction> predinst = createDeepCopyInstructionSet(ipb.getPredicate(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true);
+ IfProgramBlock tmpPB = new IfProgramBlock(prog, predinst);
+ tmpPB.setStatementBlock( createIfStatementBlockCopy((IfStatementBlock)ipb.getStatementBlock(), pid, plain, forceDeepCopy ) );
+ tmpPB.setThreadID(pid);
+ tmpPB.setExitInstructions2( createDeepCopyInstructionSet(ipb.getExitInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true));
+ tmpPB.setChildBlocksIfBody(rcreateDeepCopyProgramBlocks(ipb.getChildBlocksIfBody(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy));
+ tmpPB.setChildBlocksElseBody(rcreateDeepCopyProgramBlocks(ipb.getChildBlocksElseBody(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy));
+ return tmpPB;
+ }
+
+ public static ForProgramBlock createDeepCopyForProgramBlock(ForProgramBlock fpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) {
+ ForProgramBlock tmpPB = new ForProgramBlock(prog,fpb.getIterVar());
+ tmpPB.setStatementBlock( createForStatementBlockCopy((ForStatementBlock)fpb.getStatementBlock(), pid, plain, forceDeepCopy));
+ tmpPB.setThreadID(pid);
+ tmpPB.setFromInstructions( createDeepCopyInstructionSet(fpb.getFromInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
+ tmpPB.setToInstructions( createDeepCopyInstructionSet(fpb.getToInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
+ tmpPB.setIncrementInstructions( createDeepCopyInstructionSet(fpb.getIncrementInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
+ tmpPB.setExitInstructions( createDeepCopyInstructionSet(fpb.getExitInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
+ tmpPB.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy) );
+ return tmpPB;
+ }
+
+ public static ForProgramBlock createShallowCopyForProgramBlock(ForProgramBlock fpb, Program prog ) {
+ ForProgramBlock tmpPB = new ForProgramBlock(prog,fpb.getIterVar());
+ tmpPB.setFromInstructions( fpb.getFromInstructions() );
+ tmpPB.setToInstructions( fpb.getToInstructions() );
+ tmpPB.setIncrementInstructions( fpb.getIncrementInstructions() );
+ tmpPB.setExitInstructions( fpb.getExitInstructions() );
+ tmpPB.setChildBlocks( fpb.getChildBlocks() );
+ return tmpPB;
+ }
+
+ public static ParForProgramBlock createDeepCopyParForProgramBlock(ParForProgramBlock pfpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) {
+ ParForProgramBlock tmpPB = null;
+
+ if( IDPrefix == -1 ) //still on master node
+ tmpPB = new ParForProgramBlock(prog,pfpb.getIterVar(), pfpb.getParForParams(), pfpb.getResultVariables());
+ else //child of remote ParWorker at any level
+ tmpPB = new ParForProgramBlock(IDPrefix, prog, pfpb.getIterVar(), pfpb.getParForParams(), pfpb.getResultVariables());
+
+ tmpPB.setStatementBlock( createForStatementBlockCopy( (ForStatementBlock) pfpb.getStatementBlock(), pid, plain, forceDeepCopy) );
+ tmpPB.setThreadID(pid);
+
+ tmpPB.disableOptimization(); //already done in top-level parfor
+ tmpPB.disableMonitorReport(); //already done in top-level parfor
+
+ tmpPB.setFromInstructions( createDeepCopyInstructionSet(pfpb.getFromInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
+ tmpPB.setToInstructions( createDeepCopyInstructionSet(pfpb.getToInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
+ tmpPB.setIncrementInstructions( createDeepCopyInstructionSet(pfpb.getIncrementInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
+ tmpPB.setExitInstructions( createDeepCopyInstructionSet(pfpb.getExitInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
+
+ //NOTE: Normally, no recursive copy because (1) copied on each execution in this PB anyway
+ //and (2) leave placeholders as they are. However, if plain, an explicit deep copy is requested.
+ if( plain || forceDeepCopy )
+ tmpPB.setChildBlocks( rcreateDeepCopyProgramBlocks(pfpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy) );
+ else
+ tmpPB.setChildBlocks( pfpb.getChildBlocks() );
+
+ return tmpPB;
+ }
+
+ /**
+ * This creates a deep copy of a function program block. The central reference to singletons of function program blocks
+ * poses the need for explicit copies in order to prevent conflicting writes of temporary variables (see ExternalFunctionProgramBlock.
+ *
+ * @param namespace function namespace
+ * @param oldName ?
+ * @param pid ?
+ * @param IDPrefix ?
+ * @param prog runtime program
+ * @param fnStack ?
+ * @param fnCreated ?
+ * @param plain ?
+ */
+ public static void createDeepCopyFunctionProgramBlock(String namespace, String oldName, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain)
+ {
+ //fpb guaranteed to be non-null (checked inside getFunctionProgramBlock)
+ FunctionProgramBlock fpb = prog.getFunctionProgramBlock(namespace, oldName);
+ String fnameNew = (plain)? oldName :(oldName+Lop.CP_CHILD_THREAD+pid);
+ String fnameNewKey = DMLProgram.constructFunctionKey(namespace,fnameNew);
+
+ if( prog.getFunctionProgramBlocks().containsKey(fnameNewKey) )
+ return; //prevent redundant deep copy if already existent
+
+ //create deep copy
+ FunctionProgramBlock copy = null;
+ ArrayList<DataIdentifier> tmp1 = new ArrayList<>();
+ ArrayList<DataIdentifier> tmp2 = new ArrayList<>();
+ if( fpb.getInputParams()!= null )
+ tmp1.addAll(fpb.getInputParams());
+ if( fpb.getOutputParams()!= null )
+ tmp2.addAll(fpb.getOutputParams());
+
+ if( fpb instanceof ExternalFunctionProgramBlockCP ) {
+ ExternalFunctionProgramBlockCP efpb = (ExternalFunctionProgramBlockCP) fpb;
+ HashMap<String,String> tmp3 = efpb.getOtherParams();
+ if( IDPrefix!=-1 )
+ copy = new ExternalFunctionProgramBlockCP(prog,tmp1,tmp2,tmp3,saveReplaceFilenameThreadID(efpb.getBaseDir(),Lop.CP_CHILD_THREAD+IDPrefix,Lop.CP_CHILD_THREAD+pid));
+ else
+ copy = new ExternalFunctionProgramBlockCP(prog,tmp1,tmp2,tmp3,saveReplaceFilenameThreadID(efpb.getBaseDir(),Lop.CP_ROOT_THREAD_ID,Lop.CP_CHILD_THREAD+pid));
+ }
+ else if( fpb instanceof ExternalFunctionProgramBlock ) {
+ ExternalFunctionProgramBlock efpb = (ExternalFunctionProgramBlock) fpb;
+ HashMap<String,String> tmp3 = efpb.getOtherParams();
+ if( IDPrefix!=-1 )
+ copy = new ExternalFunctionProgramBlock(prog,tmp1,tmp2,tmp3,saveReplaceFilenameThreadID(efpb.getBaseDir(),Lop.CP_CHILD_THREAD+IDPrefix, Lop.CP_CHILD_THREAD+pid));
+ else
+ copy = new ExternalFunctionProgramBlock(prog,tmp1,tmp2,tmp3,saveReplaceFilenameThreadID(efpb.getBaseDir(),Lop.CP_ROOT_THREAD_ID, Lop.CP_CHILD_THREAD+pid));
+ }
+ else {
+ if( !fnStack.contains(fnameNewKey) ) {
+ fnStack.add(fnameNewKey);
+ copy = new FunctionProgramBlock(prog, tmp1, tmp2);
+ copy.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, fpb.isRecompileOnce()) );
+ copy.setRecompileOnce( fpb.isRecompileOnce() );
+ copy.setThreadID(pid);
+ fnStack.remove(fnameNewKey);
+ }
+ else //stop deep copy for recursive function calls
+ copy = fpb;
+ }
+
+ //copy.setVariables( (LocalVariableMap) fpb.getVariables() ); //implicit cloning
+ //note: instructions not used by function program block
+
+ //put
+ prog.addFunctionProgramBlock(namespace, fnameNew, copy);
+ fnCreated.add(DMLProgram.constructFunctionKey(namespace, fnameNew));
+ }
+
+ public static FunctionProgramBlock createDeepCopyFunctionProgramBlock(FunctionProgramBlock fpb, HashSet<String> fnStack, HashSet<String> fnCreated)
+ {
+ if( fpb == null )
+ throw new DMLRuntimeException("Unable to create a deep copy of a non-existing FunctionProgramBlock.");
+
+ //create deep copy
+ FunctionProgramBlock copy = null;
+ ArrayList<DataIdentifier> tmp1 = new ArrayList<>();
+ ArrayList<DataIdentifier> tmp2 = new ArrayList<>();
+ if( fpb.getInputParams()!= null )
+ tmp1.addAll(fpb.getInputParams());
+ if( fpb.getOutputParams()!= null )
+ tmp2.addAll(fpb.getOutputParams());
+
+ copy = new FunctionProgramBlock(fpb.getProgram(), tmp1, tmp2);
+ copy.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), 0, -1, fnStack, fnCreated, true, fpb.isRecompileOnce()) );
+ copy.setStatementBlock( fpb.getStatementBlock() );
+ copy.setRecompileOnce(fpb.isRecompileOnce());
+ //copy.setVariables( (LocalVariableMap) fpb.getVariables() ); //implicit cloning
+ //note: instructions not used by function program block
+
+ return copy;
+ }
+
+
+ /**
+ * Creates a deep copy of an array of instructions and replaces the placeholders of parworker
+ * IDs with the concrete IDs of this parfor instance. This is a helper method uses for generating
+ * deep copies of program blocks.
+ *
+ * @param instSet list of instructions
+ * @param pid ?
+ * @param IDPrefix ?
+ * @param prog runtime program
+ * @param fnStack ?
+ * @param fnCreated ?
+ * @param plain ?
+ * @param cpFunctions ?
+ * @return list of instructions
+ */
+ public static ArrayList<Instruction> createDeepCopyInstructionSet(ArrayList<Instruction> instSet, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean cpFunctions) {
+ ArrayList<Instruction> tmp = new ArrayList<>();
+ for( Instruction inst : instSet ) {
+ if( inst instanceof FunctionCallCPInstruction && cpFunctions ) {
+ FunctionCallCPInstruction finst = (FunctionCallCPInstruction) inst;
+ createDeepCopyFunctionProgramBlock( finst.getNamespace(),
+ finst.getFunctionName(), pid, IDPrefix, prog, fnStack, fnCreated, plain );
+ }
+ tmp.add( cloneInstruction( inst, pid, plain, cpFunctions ) );
+ }
+ return tmp;
+ }
+
+ public static Instruction cloneInstruction( Instruction oInst, long pid, boolean plain, boolean cpFunctions )
+ {
+ Instruction inst = null;
+ String tmpString = oInst.toString();
+
+ try
+ {
+ if( oInst instanceof CPInstruction || oInst instanceof SPInstruction || oInst instanceof MRInstruction
+ || oInst instanceof GPUInstruction ) {
+ if( oInst instanceof FunctionCallCPInstruction && cpFunctions ) {
+ FunctionCallCPInstruction tmp = (FunctionCallCPInstruction) oInst;
+ if( !plain ) {
+ //safe replacement because target variables might include the function name
+ //note: this is no update-in-place in order to keep the original function name as basis
+ tmpString = tmp.updateInstStringFunctionName(tmp.getFunctionName(), tmp.getFunctionName() + Lop.CP_CHILD_THREAD+pid);
+ }
+ //otherwise: preserve function name
+ }
+ inst = InstructionParser.parseSingleInstruction(tmpString);
+ }
+ else if( oInst instanceof MRJobInstruction ) {
+ //clone via copy constructor
+ inst = new MRJobInstruction( (MRJobInstruction)oInst );
+ }
+ else
+ throw new DMLRuntimeException("Failed to clone instruction: "+oInst);
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException(ex);
+ }
+
+ //save replacement of thread id references in instructions
+ inst = saveReplaceThreadID( inst, Lop.CP_ROOT_THREAD_ID, Lop.CP_CHILD_THREAD+pid);
+
+ return inst;
+ }
+
+ public static StatementBlock createStatementBlockCopy( StatementBlock sb, long pid, boolean plain, boolean forceDeepCopy )
+ {
+ StatementBlock ret = null;
+
+ try
+ {
+ if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION)
+ && sb != null //forced deep copy for function recompilation
+ && (Recompiler.requiresRecompilation( sb.getHops() ) || forceDeepCopy) )
+ {
+ //create new statement (shallow copy livein/liveout for recompile, line numbers for explain)
+ ret = new StatementBlock();
+ ret.setDMLProg(sb.getDMLProg());
+ ret.setParseInfo(sb);
+ ret.setLiveIn( sb.liveIn() );
+ ret.setLiveOut( sb.liveOut() );
+ ret.setUpdatedVariables( sb.variablesUpdated() );
+ ret.setReadVariables( sb.variablesRead() );
+
+ //deep copy hops dag for concurrent recompile
+ ArrayList<Hop> hops = Recompiler.deepCopyHopsDag( sb.getHops() );
+ if( !plain )
+ Recompiler.updateFunctionNames( hops, pid );
+ ret.setHops( hops );
+ ret.updateRecompilationFlag();
+ }
+ else {
+ ret = sb;
+ }
+ }
+ catch( Exception ex ) {
+ throw new DMLRuntimeException( ex );
+ }
+
+ return ret;
+ }
+
+ public static IfStatementBlock createIfStatementBlockCopy( IfStatementBlock sb, long pid, boolean plain, boolean forceDeepCopy )
+ {
+ IfStatementBlock ret = null;
+
+ try
+ {
+ if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION)
+ && sb != null //forced deep copy for function recompile
+ && (Recompiler.requiresRecompilation( sb.getPredicateHops() ) || forceDeepCopy) )
+ {
+ //create new statement (shallow copy livein/liveout for recompile, line numbers for explain)
+ ret = new IfStatementBlock();
+ ret.setDMLProg(sb.getDMLProg());
+ ret.setParseInfo(sb);
+ ret.setLiveIn( sb.liveIn() );
+ ret.setLiveOut( sb.liveOut() );
+ ret.setUpdatedVariables( sb.variablesUpdated() );
+ ret.setReadVariables( sb.variablesRead() );
+
+ //shallow copy child statements
+ ret.setStatements( sb.getStatements() );
+
+ //deep copy predicate hops dag for concurrent recompile
+ Hop hops = Recompiler.deepCopyHopsDag( sb.getPredicateHops() );
+ ret.setPredicateHops( hops );
+ ret.updatePredicateRecompilationFlag();
+ }
+ else {
+ ret = sb;
+ }
+ }
+ catch( Exception ex ) {
+ throw new DMLRuntimeException( ex );
+ }
+
+ return ret;
+ }
+
+ public static WhileStatementBlock createWhileStatementBlockCopy( WhileStatementBlock sb, long pid, boolean plain, boolean forceDeepCopy )
+ {
+ WhileStatementBlock ret = null;
+
+ try
+ {
+ if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION)
+ && sb != null //forced deep copy for function recompile
+ && (Recompiler.requiresRecompilation( sb.getPredicateHops() ) || forceDeepCopy) )
+ {
+ //create new statement (shallow copy livein/liveout for recompile, line numbers for explain)
+ ret = new WhileStatementBlock();
+ ret.setDMLProg(sb.getDMLProg());
+ ret.setParseInfo(sb);
+ ret.setLiveIn( sb.liveIn() );
+ ret.setLiveOut( sb.liveOut() );
+ ret.setUpdatedVariables( sb.variablesUpdated() );
+ ret.setReadVariables( sb.variablesRead() );
+ ret.setUpdateInPlaceVars( sb.getUpdateInPlaceVars() );
+
+ //shallow copy child statements
+ ret.setStatements( sb.getStatements() );
+
+ //deep copy predicate hops dag for concurrent recompile
+ Hop hops = Recompiler.deepCopyHopsDag( sb.getPredicateHops() );
+ ret.setPredicateHops( hops );
+ ret.updatePredicateRecompilationFlag();
+ }
+ else {
+ ret = sb;
+ }
+ }
+ catch( Exception ex ) {
+ throw new DMLRuntimeException( ex );
+ }
+
+ return ret;
+ }
+
+ public static ForStatementBlock createForStatementBlockCopy( ForStatementBlock sb, long pid, boolean plain, boolean forceDeepCopy )
+ {
+ ForStatementBlock ret = null;
+
+ try
+ {
+ if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION)
+ && sb != null
+ && ( Recompiler.requiresRecompilation(sb.getFromHops()) ||
+ Recompiler.requiresRecompilation(sb.getToHops()) ||
+ Recompiler.requiresRecompilation(sb.getIncrementHops()) ||
+ forceDeepCopy ) )
+ {
+ ret = (sb instanceof ParForStatementBlock) ? new ParForStatementBlock() : new ForStatementBlock();
+
+ //create new statement (shallow copy livein/liveout for recompile, line numbers for explain)
+ ret.setDMLProg(sb.getDMLProg());
+ ret.setParseInfo(sb);
+ ret.setLiveIn( sb.liveIn() );
+ ret.setLiveOut( sb.liveOut() );
+ ret.setUpdatedVariables( sb.variablesUpdated() );
+ ret.setReadVariables( sb.variablesRead() );
+ ret.setUpdateInPlaceVars( sb.getUpdateInPlaceVars() );
+
+ //shallow copy child statements
+ ret.setStatements( sb.getStatements() );
+
+ //deep copy predicate hops dag for concurrent recompile
+ if( sb.requiresFromRecompilation() ){
+ Hop hops = Recompiler.deepCopyHopsDag( sb.getFromHops() );
+ ret.setFromHops( hops );
+ }
+ if( sb.requiresToRecompilation() ){
+ Hop hops = Recompiler.deepCopyHopsDag( sb.getToHops() );
+ ret.setToHops( hops );
+ }
+ if( sb.requiresIncrementRecompilation() ){
+ Hop hops = Recompiler.deepCopyHopsDag( sb.getIncrementHops() );
+ ret.setIncrementHops( hops );
+ }
+ ret.updatePredicateRecompilationFlags();
+ }
+ else {
+ ret = sb;
+ }
+ }
+ catch( Exception ex ) {
+ throw new DMLRuntimeException( ex );
+ }
+
+ return ret;
+ }
+
+
+ ////////////////////////////////
+ // SERIALIZATION
+ ////////////////////////////////
+
+ public static String serializeSparkPSBody(SparkPSBody body, HashMap<String, byte[]> clsMap) {
+
+ ExecutionContext ec = body.getEc();
+ StringBuilder builder = new StringBuilder();
+ builder.append(PSBODY_BEGIN);
+ builder.append(NEWLINE);
+
+ //handle DMLScript UUID (propagate original uuid for writing to scratch space)
+ builder.append(DMLScript.getUUID());
+ builder.append(COMPONENTS_DELIM);
+ builder.append(NEWLINE);
+
+ //handle DML config
+ builder.append(ConfigurationManager.getDMLConfig().serializeDMLConfig());
+ builder.append(COMPONENTS_DELIM);
+ builder.append(NEWLINE);
+
+ //handle additional configurations
+ builder.append(CONF_STATS + "=" + DMLScript.STATISTICS);
+ builder.append(COMPONENTS_DELIM);
+ builder.append(NEWLINE);
+
+ //handle program
+ builder.append(PROG_BEGIN);
+ builder.append(NEWLINE);
+ builder.append(rSerializeFunctionProgramBlocks(ec.getProgram().getFunctionProgramBlocks(),
+ new HashSet<>(ec.getProgram().getFunctionProgramBlocks().keySet()), clsMap));
+ builder.append(PROG_END);
+ builder.append(NEWLINE);
+ builder.append(COMPONENTS_DELIM);
+ builder.append(NEWLINE);
+
+ //handle execution context
+ builder.append(EC_BEGIN);
+ builder.append(serializeExecutionContext(ec));
+ builder.append(EC_END);
+ builder.append(NEWLINE);
+ builder.append(COMPONENTS_DELIM);
+ builder.append(NEWLINE);
+
+ //handle program blocks
+ builder.append(PBS_BEGIN);
+ builder.append(NEWLINE);
+ builder.append(rSerializeProgramBlocks(ec.getProgram().getProgramBlocks(), clsMap));
+ builder.append(PBS_END);
+ builder.append(NEWLINE);
+
+ builder.append(PSBODY_END);
+
+ return builder.toString();
+ }
+
+ public static String serializeParForBody( ParForBody body ) {
+ return serializeParForBody(body, new HashMap<String, byte[]>());
+ }
+
+ public static String serializeParForBody( ParForBody body, HashMap<String,byte[]> clsMap )
+ {
+ ArrayList<ProgramBlock> pbs = body.getChildBlocks();
+ ArrayList<ResultVar> rVnames = body.getResultVariables();
+ ExecutionContext ec = body.getEc();
+
+ if( pbs.isEmpty() )
+ return PARFORBODY_BEGIN + PARFORBODY_END;
+
+ Program prog = pbs.get( 0 ).getProgram();
+
+ StringBuilder sb = new StringBuilder();
+ sb.append( PARFORBODY_BEGIN );
+ sb.append( NEWLINE );
+
+ //handle DMLScript UUID (propagate original uuid for writing to scratch space)
+ sb.append( DMLScript.getUUID() );
+ sb.append( COMPONENTS_DELIM );
+ sb.append( NEWLINE );
+
+ //handle DML config
+ sb.append( ConfigurationManager.getDMLConfig().serializeDMLConfig() );
+ sb.append( COMPONENTS_DELIM );
+ sb.append( NEWLINE );
+
+ //handle additional configurations
+ sb.append( CONF_STATS + "=" + DMLScript.STATISTICS );
+ sb.append( COMPONENTS_DELIM );
+ sb.append( NEWLINE );
+
+ //handle program
+ sb.append(PROG_BEGIN);
+ sb.append( NEWLINE );
+ sb.append( serializeProgram(prog, pbs, clsMap) );
+ sb.append(PROG_END);
+ sb.append( NEWLINE );
+ sb.append( COMPONENTS_DELIM );
+ sb.append( NEWLINE );
+
+ //handle result variable names
+ sb.append( serializeResultVariables(rVnames) );
+ sb.append( COMPONENTS_DELIM );
+
+ //handle execution context
+ //note: this includes also the symbol table (serialize only the top-level variable map,
+ // (symbol tables for nested/child blocks are created at parse time, on the remote side)
+ sb.append(EC_BEGIN);
+ sb.append( serializeExecutionContext(ec) );
+ sb.append(EC_END);
+ sb.append( NEWLINE );
+ sb.append( COMPONENTS_DELIM );
+ sb.append( NEWLINE );
+
+ //handle program blocks
+ sb.append(PBS_BEGIN);
+ sb.append( NEWLINE );
+ sb.append( rSerializeProgramBlocks(pbs, clsMap) );
+ sb.append(PBS_END);
+ sb.append( NEWLINE );
+
+ sb.append( PARFORBODY_END );
+
+ return sb.toString();
+ }
+
+ private static String serializeProgram( Program prog, ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap ) {
+ //note program contains variables, programblocks and function program blocks
+ //but in order to avoid redundancy, we only serialize function program blocks
+ HashMap<String, FunctionProgramBlock> fpb = prog.getFunctionProgramBlocks();
+ HashSet<String> cand = new HashSet<>();
+ rFindSerializationCandidates(pbs, cand);
+ return rSerializeFunctionProgramBlocks( fpb, cand, clsMap );
+ }
+
+ private static void rFindSerializationCandidates( ArrayList<ProgramBlock> pbs, HashSet<String> cand )
+ {
+ for( ProgramBlock pb : pbs )
+ {
+ if( pb instanceof WhileProgramBlock ) {
+ WhileProgramBlock wpb = (WhileProgramBlock) pb;
+ rFindSerializationCandidates(wpb.getChildBlocks(), cand );
+ }
+ else if ( pb instanceof ForProgramBlock || pb instanceof ParForProgramBlock ) {
+ ForProgramBlock fpb = (ForProgramBlock) pb;
+ rFindSerializationCandidates(fpb.getChildBlocks(), cand);
+ }
+ else if ( pb instanceof IfProgramBlock ) {
+ IfProgramBlock ipb = (IfProgramBlock) pb;
+ rFindSerializationCandidates(ipb.getChildBlocksIfBody(), cand);
+ if( ipb.getChildBlocksElseBody() != null )
+ rFindSerializationCandidates(ipb.getChildBlocksElseBody(), cand);
+ }
+ else { //all generic program blocks
+ for( Instruction inst : pb.getInstructions() )
+ if( inst instanceof FunctionCallCPInstruction ) {
+ FunctionCallCPInstruction fci = (FunctionCallCPInstruction) inst;
+ String fkey = DMLProgram.constructFunctionKey(fci.getNamespace(), fci.getFunctionName());
+ if( !cand.contains(fkey) ) { //memoization for multiple calls, recursion
+ cand.add( fkey ); //add to candidates
+ //investigate chains of function calls
+ FunctionProgramBlock fpb = pb.getProgram().getFunctionProgramBlock(fci.getNamespace(), fci.getFunctionName());
+ rFindSerializationCandidates(fpb.getChildBlocks(), cand);
+ }
+ }
+ }
+ }
+ }
+
+ private static String serializeVariables (LocalVariableMap vars) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(VARS_BEGIN);
+ sb.append( vars.serialize() );
+ sb.append(VARS_END);
+ return sb.toString();
+ }
+
+ public static String serializeDataObject(String key, Data dat)
+ {
+ // SCHEMA: <name>|<datatype>|<valuetype>|value
+ // (scalars are serialize by value, matrices by filename)
+ StringBuilder sb = new StringBuilder();
+ //prepare data for serialization
+ String name = key;
+ DataType datatype = dat.getDataType();
+ ValueType valuetype = dat.getValueType();
+ String value = null;
+ String[] metaData = null;
+ String[] listData = null;
+ switch( datatype )
+ {
+ case SCALAR:
+ ScalarObject so = (ScalarObject) dat;
+ //name = so.getName();
+ value = so.getStringValue();
+ break;
+ case MATRIX:
+ MatrixObject mo = (MatrixObject) dat;
+ MetaDataFormat md = (MetaDataFormat) dat.getMetaData();
+ MatrixCharacteristics mc = md.getMatrixCharacteristics();
+ value = mo.getFileName();
+ PartitionFormat partFormat = (mo.getPartitionFormat()!=null) ? new PartitionFormat(
+ mo.getPartitionFormat(),mo.getPartitionSize()) : PartitionFormat.NONE;
+ metaData = new String[9];
+ metaData[0] = String.valueOf( mc.getRows() );
+ metaData[1] = String.valueOf( mc.getCols() );
+ metaData[2] = String.valueOf( mc.getRowsPerBlock() );
+ metaData[3] = String.valueOf( mc.getColsPerBlock() );
+ metaData[4] = String.valueOf( mc.getNonZeros() );
+ metaData[5] = InputInfo.inputInfoToString( md.getInputInfo() );
+ metaData[6] = OutputInfo.outputInfoToString( md.getOutputInfo() );
+ metaData[7] = String.valueOf( partFormat );
+ metaData[8] = String.valueOf( mo.getUpdateType() );
+ break;
+ case LIST:
+ // SCHEMA: <name>|<datatype>|<valuetype>|value|<metadata>|<tab>element1<tab>element2<tab>element3 (this is the list)
+ // (for the element1) <listName-index>|<datatype>|<valuetype>|value
+ // (for the element2) <listName-index>|<datatype>|<valuetype>|value
+ ListObject lo = (ListObject) dat;
+ value = REF;
+ metaData = new String[2];
+ metaData[0] = String.valueOf(lo.getLength());
+ metaData[1] = lo.getNames() == null ? EMPTY : serializeList(lo.getNames(), ELEMENT_DELIM2);
+ listData = new String[lo.getLength()];
+ for (int index = 0; index < lo.getLength(); index++) {
+ listData[index] = serializeDataObject(name + DASH + index, lo.slice(index));
+ }
+ break;
+ default:
+ throw new DMLRuntimeException("Unable to serialize datatype "+datatype);
+ }
+
+ //serialize data
+ sb.append(name);
+ sb.append(DATA_FIELD_DELIM);
+ sb.append(datatype);
+ sb.append(DATA_FIELD_DELIM);
+ sb.append(valuetype);
+ sb.append(DATA_FIELD_DELIM);
+ sb.append(value);
+ if( metaData != null )
+ for( int i=0; i<metaData.length; i++ ) {
+ sb.append(DATA_FIELD_DELIM);
+ sb.append(metaData[i]);
+ }
+ if (listData != null) {
+ sb.append(DATA_FIELD_DELIM);
+ for (String ld : listData) {
+ sb.append(LIST_ELEMENT_DELIM);
+ sb.append(ld);
+ }
+ }
+
+ return sb.toString();
+ }
+
+ private static String serializeExecutionContext( ExecutionContext ec ) {
+ return (ec != null) ? serializeVariables( ec.getVariables() ) : EMPTY;
+ }
+
+ @SuppressWarnings("all")
+ private static String serializeInstructions( ArrayList<Instruction> inst, HashMap<String, byte[]> clsMap )
+ {
+ StringBuilder sb = new StringBuilder();
+ int count = 0;
+ for( Instruction linst : inst ) {
+ //check that only cp instruction are transmitted
+ if( !( linst instanceof CPInstruction || linst instanceof ExternalFunctionInvocationInstruction ) )
+ throw new DMLRuntimeException( NOT_SUPPORTED_MR_INSTRUCTION + " " +linst.getClass().getName()+"\n"+linst );
+
+ //obtain serialized version of generated classes
+ if( linst instanceof SpoofCPInstruction ) {
+ Class<?> cla = ((SpoofCPInstruction) linst).getOperatorClass();
+ clsMap.put(cla.getName(), CodegenUtils.getClassData(cla.getName()));
+ }
+
+ if( count > 0 )
+ sb.append( ELEMENT_DELIM );
+
+ sb.append( checkAndReplaceLiterals( linst.toString() ) );
+ count++;
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * Replacement of internal delimiters occurring in literals of instructions
+ * in order to ensure robustness of serialization and parsing.
+ * (e.g. print( "a,b" ) would break the parsing of instruction that internally
+ * are separated with a "," )
+ *
+ * @param instStr instruction string
+ * @return instruction string with replacements
+ */
+ private static String checkAndReplaceLiterals( String instStr )
+ {
+ String tmp = instStr;
+
+ //1) check own delimiters (very unlikely due to special characters)
+ if( tmp.contains(COMPONENTS_DELIM) ) {
+ tmp = tmp.replaceAll(COMPONENTS_DELIM, ".");
+ LOG.warn("Replaced special literal character sequence "+COMPONENTS_DELIM+" with '.'");
+ }
+
+ if( tmp.contains(ELEMENT_DELIM) ) {
+ tmp = tmp.replaceAll(ELEMENT_DELIM, ".");
+ LOG.warn("Replaced special literal character sequence "+ELEMENT_DELIM+" with '.'");
+ }
+
+ if( tmp.contains( LEVELIN ) ){
+ tmp = tmp.replaceAll(LEVELIN, "("); // '\\' required if LEVELIN='{' because regex
+ LOG.warn("Replaced special literal character sequence "+LEVELIN+" with '('");
+ }
+
+ if( tmp.contains(LEVELOUT) ){
+ tmp = tmp.replaceAll(LEVELOUT, ")");
+ LOG.warn("Replaced special literal character sequence "+LEVELOUT+" with ')'");
+ }
+
+ //NOTE: DATA_FIELD_DELIM and KEY_VALUE_DELIM not required
+ //because those literals cannot occur in critical places.
+
+ //2) check end tag of CDATA
+ if( tmp.contains(CDATA_END) ){
+ tmp = tmp.replaceAll(CDATA_END, "."); //prevent XML parsing issues in job.xml
+ LOG.warn("Replaced special literal character sequence "+ CDATA_END +" with '.'");
+ }
+
+ return tmp;
+ }
+
+ private static String serializeStringHashMap(HashMap<String,String> vars) {
+ return serializeList(vars.entrySet().stream().map(e ->
+ e.getKey()+KEY_VALUE_DELIM+e.getValue()).collect(Collectors.toList()));
+ }
+
+ public static String serializeResultVariables( List<ResultVar> vars) {
+ return serializeList(vars.stream().map(v -> v._isAccum ?
+ v._name+"+" : v._name).collect(Collectors.toList()));
+ }
+
+ public static String serializeList(List<String> elements) {
+ return serializeList(elements, ELEMENT_DELIM);
+ }
+
+ public static String serializeList(List<String> elements, String delim) {
+ return StringUtils.join(elements, delim);
+ }
+
+ private static String serializeDataIdentifiers(List<DataIdentifier> vars) {
+ return serializeList(vars.stream().map(v ->
+ serializeDataIdentifier(v)).collect(Collectors.toList()));
+ }
+
+ private static String serializeDataIdentifier( DataIdentifier dat ) {
+ // SCHEMA: <name>|<datatype>|<valuetype>
+ StringBuilder sb = new StringBuilder();
+ sb.append(dat.getName());
+ sb.append(DATA_FIELD_DELIM);
+ sb.append(dat.getDataType());
+ sb.append(DATA_FIELD_DELIM);
+ sb.append(dat.getValueType());
+ return sb.toString();
+ }
+
+ private static String rSerializeFunctionProgramBlocks(HashMap<String,FunctionProgramBlock> pbs, HashSet<String> cand, HashMap<String, byte[]> clsMap) {
+ StringBuilder sb = new StringBuilder();
+ int count = 0;
+ for( Entry<String,FunctionProgramBlock> pb : pbs.entrySet() ) {
+ if( !cand.contains(pb.getKey()) ) //skip function not included in the parfor body
+ continue;
+ if( count>0 ) {
+ sb.append( ELEMENT_DELIM );
+ sb.append( NEWLINE );
+ }
+ sb.append( pb.getKey() );
+ sb.append( KEY_VALUE_DELIM );
+ sb.append( rSerializeProgramBlock(pb.getValue(), clsMap) );
+ count++;
+ }
+ sb.append(NEWLINE);
+ return sb.toString();
+ }
+
+ private static String rSerializeProgramBlocks(ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap) {
+ StringBuilder sb = new StringBuilder();
+ int count = 0;
+ for( ProgramBlock pb : pbs ) {
+ if( count>0 ) {
+ sb.append( ELEMENT_DELIM );
+ sb.append(NEWLINE);
+ }
+ sb.append( rSerializeProgramBlock(pb, clsMap) );
+ count++;
+ }
+ return sb.toString();
+ }
+
+ private static String rSerializeProgramBlock( ProgramBlock pb, HashMap<String, byte[]> clsMap ) {
+ StringBuilder sb = new StringBuilder();
+
+ //handle header
+ if( pb instanceof WhileProgramBlock )
+ sb.append(PB_WHILE);
+ else if ( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock) )
+ sb.append(PB_FOR);
+ else if ( pb instanceof ParForProgramBlock )
+ sb.append(PB_PARFOR);
+ else if ( pb instanceof IfProgramBlock )
+ sb.append(PB_IF);
+ else if ( pb instanceof FunctionProgramBlock && !(pb instanceof ExternalFunctionProgramBlock) )
+ sb.append(PB_FC);
+ else if ( pb instanceof ExternalFunctionProgramBlock )
+ sb.append(PB_EFC);
+ else //all generic program blocks
+ sb.append(PB_BEGIN);
+
+ //handle body
+ if( pb instanceof WhileProgramBlock )
+ {
+ WhileProgramBlock wpb = (WhileProgramBlock) pb;
+ sb.append(INST_BEGIN);
+ sb.append( serializeInstructions( wpb.getPredicate(), clsMap ) );
+ sb.append(INST_END);
+ sb.append( COMPONENTS_DELIM );
+ sb.append(INST_BEGIN);
+ sb.append( serializeInstructions( wpb.getExitInstructions(), clsMap ) );
+ sb.append(INST_END);
+ sb.append( COMPONENTS_DELIM );
+ sb.append(PBS_BEGIN);
+ sb.append( rSerializeProgramBlocks( wpb.getChildBlocks(), clsMap) );
+ sb.append(PBS_END);
+ }
+ else if ( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock ) )
+ {
+ ForProgramBlock fpb = (ForProgramBlock) pb;
+ sb.append( fpb.getIterVar() );
+ sb.append( COMPONENTS_DELIM );
+ sb.append(INST_BEGIN);
+ sb.append( serializeInstructions( fpb.getFromInstructions(), clsMap ) );
+ sb.append(INST_END);
+ sb.append( COMPONENTS_DELIM );
+ sb.append(INST_BEGIN);
+ sb.append( serializeInstructions(fpb.getToInstructions(), clsMap) );
+ sb.append(INST_END);
+ sb.append( COMPONENTS_DELIM );
+ sb.append(INST_BEGIN);
+ sb.append( serializeInstructions(fpb.getIncrementInstructions(), clsMap) );
+ sb.append(INST_END);
+ sb.append( COMPONENTS_DELIM );
+ sb.append(INST_BEGIN);
+ sb.append( serializeInstructions(fpb.getExitInstructions(), clsMap) );
+ sb.append(INST_END);
+ sb.append( COMPONENTS_DELIM );
+ sb.append(PBS_BEGIN);
+ sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) );
+ sb.append(PBS_END);
+ }
+ else if ( pb instanceof ParForProgramBlock )
+ {
+ ParForProgramBlock pfpb = (ParForProgramBlock) pb;
+
+ //check for nested remote ParFOR
+ if( PExecMode.valueOf( pfpb.getParForParams().get( ParForStatementBlock.EXEC_MODE )) == PExecMode.REMOTE_MR )
+ throw new DMLRuntimeException( NOT_SUPPORTED_MR_PARFOR );
+
+ sb.append( pfpb.getIterVar() );
+ sb.append( COMPONENTS_DELIM );
+ sb.append( serializeResultVariables( pfpb.getResultVariables()) );
+ sb.append( COMPONENTS_DELIM );
+ sb.append( serializeStringHashMap( pfpb.getParForParams()) ); //parameters of nested parfor
+ sb.append( COMPONENTS_DELIM );
+ sb.append(INST_BEGIN);
+ sb.append( serializeInstructions(pfpb.getFromInstructions(), clsMap) );
+ sb.append(INST_END);
+ sb.append( COMPONENTS_DELIM );
+ sb.append(INST_BEGIN);
+ sb.append( serializeInstructions(pfpb.getToInstructions(), clsMap) );
+ sb.append(INST_END);
+ sb.append( COMPONENTS_DELIM );
+ sb.append(INST_BEGIN);
+ sb.append( serializeInstructions(pfpb.getIncrementInstructions(), clsMap) );
+ sb.append(INST_END);
+ sb.append( COMPONENTS_DELIM );
+ sb.append(INST_BEGIN);
+ sb.append( serializeInstructions(pfpb.getExitInstructions(), clsMap) );
+ sb.append(INST_END);
+ sb.append( COMPONENTS_DELIM );
+ sb.append(PBS_BEGIN);
+ sb.append( rSerializeProgramBlocks( pfpb.getChildBlocks(), clsMap ) );
+ sb.append(PBS_END);
+ }
+ else if ( pb instanceof IfProgramBlock )
+ {
+ IfProgramBlock ipb = (IfProgramBlock) pb;
+ sb.append(INST_BEGIN);
+ sb.append( serializeInstructions(ipb.getPredicate(), clsMap) );
+ sb.append(INST_END);
+ sb.append( COMPONENTS_DELIM );
+ sb.append(INST_BEGIN);
+ sb.append( serializeInstructions(ipb.getExitInstructions(), clsMap) );
+ sb.append(INST_END);
+ sb.append( COMPONENTS_DELIM );
+ sb.append(PBS_BEGIN);
+ sb.append( rSerializeProgramBlocks(ipb.getChildBlocksIfBody(), clsMap) );
+ sb.append(PBS_END);
+ sb.append( COMPONENTS_DELIM );
+ sb.append(PBS_BEGIN);
+ sb.append( rSerializeProgramBlocks(ipb.getChildBlocksElseBody(), clsMap) );
+ sb.append(PBS_END);
+ }
+ else if( pb instanceof FunctionProgramBlock && !(pb instanceof ExternalFunctionProgramBlock) )
+ {
+ FunctionProgramBlock fpb = (FunctionProgramBlock) pb;
+
+ sb.append( serializeDataIdentifiers( fpb.getInputParams() ) );
+ sb.append( COMPONENTS_DELIM );
+ sb.append( serializeDataIdentifiers( fpb.getOutputParams() ) );
+ sb.append( COMPONENTS_DELIM );
+ sb.append(INST_BEGIN);
+ sb.append( serializeInstructions(fpb.getInstructions(), clsMap) );
+ sb.append(INST_END);
+ sb.append( COMPONENTS_DELIM );
+ sb.append(PBS_BEGIN);
+ sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) );
+ sb.append(PBS_END);
+ sb.append( COMPONENTS_DELIM );
+ }
+ else if( pb instanceof ExternalFunctionProgramBlock )
+ {
+ if( !(pb instanceof ExternalFunctionProgramBlockCP) )
+ {
+ throw new DMLRuntimeException( NOT_SUPPORTED_EXTERNALFUNCTION_PB );
+ }
+
+ ExternalFunctionProgramBlockCP fpb = (ExternalFunctionProgramBlockCP) pb;
+
+ sb.append( serializeDataIdentifiers( fpb.getInputParams() ) );
+ sb.append( COMPONENTS_DELIM );
+ sb.append( serializeDataIdentifiers( fpb.getOutputParams() ) );
+ sb.append( COMPONENTS_DELIM );
+ sb.append( serializeStringHashMap( fpb.getOtherParams() ) );
+ sb.append( COMPONENTS_DELIM );
+ sb.append( fpb.getBaseDir() );
+ sb.append( COMPONENTS_DELIM );
+
+ sb.append(INST_BEGIN);
+ //create on construction anyway
+ sb.append(INST_END);
+ sb.append( COMPONENTS_DELIM );
+ sb.append(PBS_BEGIN);
+ sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) );
+ sb.append(PBS_END);
+ }
+ else //all generic program blocks
+ {
+ sb.append(INST_BEGIN);
+ sb.append( serializeInstructions(pb.getInstructions(), clsMap) );
+ sb.append(INST_END);
+ }
+
+
+ //handle end
+ sb.append(PB_END);
+
+ return sb.toString();
+ }
+
+
+ ////////////////////////////////
+ // PARSING
+ ////////////////////////////////
+
+ public static SparkPSBody parseSparkPSBody(String in, int id) {
+ SparkPSBody body = new SparkPSBody();
+
+ //header elimination
+ String tmpin = in.replaceAll(NEWLINE, ""); //normalization
+ tmpin = tmpin.substring(PSBODY_BEGIN.length(), tmpin.length() - PSBODY_END.length()); //remove start/end
+ HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(tmpin, COMPONENTS_DELIM);
+
+ //handle DMLScript UUID (NOTE: set directly in DMLScript)
+ //(master UUID is used for all nodes (in order to simply cleanup))
+ DMLScript.setUUID(st.nextToken());
+
+ //handle DML config (NOTE: set directly in ConfigurationManager)
+ handleDMLConfig(st.nextToken());
+
+ //handle additional configs
+ parseAndSetAdditionalConfigurations(st.nextToken());
+
+ //handle program
+ Program prog = parseProgram(st.nextToken(), id);
+
+ //handle execution context
+ ExecutionContext ec = parseExecutionContext(st.nextToken(), prog);
+ ec.setProgram(prog);
+
+ //handle program blocks
+ String spbs = st.nextToken();
+ ArrayList<ProgramBlock> pbs = rParseProgramBlocks(spbs, prog, id);
+ prog.getProgramBlocks().addAll(pbs);
+
+ body.setEc(ec);
+ return body;
+ }
+
+ public static ParForBody parseParForBody( String in, int id ) {
+ return parseParForBody(in, id, false);
+ }
+
+ public static ParForBody parseParForBody( String in, int id, boolean inSpark ) {
+ ParForBody body = new ParForBody();
+
+ //header elimination
+ String tmpin = in.replaceAll(NEWLINE, ""); //normalization
+ tmpin = tmpin.substring(PARFORBODY_BEGIN.length(),tmpin.length()-PARFORBODY_END.length()); //remove start/end
+ HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(tmpin, COMPONENTS_DELIM);
+
+ //handle DMLScript UUID (NOTE: set directly in DMLScript)
+ //(master UUID is used for all nodes (in order to simply cleanup))
+ DMLScript.setUUID( st.nextToken() );
+
+ //handle DML config (NOTE: set directly in ConfigurationManager)
+ String confStr = st.nextToken();
+ JobConf job = ConfigurationManager.getCachedJobConf();
+ if( !InfrastructureAnalyzer.isLocalMode(job) ) {
+ handleDMLConfig(confStr);
+ //init internal configuration w/ parsed or default config
+ ParForProgramBlock.initInternalConfigurations(
+ ConfigurationManager.getDMLConfig());
+ }
+
+ //handle additional configs
+ String aconfs = st.nextToken();
+ if( !inSpark )
+ parseAndSetAdditionalConfigurations( aconfs );
+
+ //handle program
+ String progStr = st.nextToken();
+ Program prog = parseProgram( progStr, id );
+
+ //handle result variable names
+ String rvarStr = st.nextToken();
+ ArrayList<ResultVar> rvars = parseResultVariables(rvarStr);
+ body.setResultVariables(rvars);
+
+ //handle execution context
+ String ecStr = st.nextToken();
+ ExecutionContext ec = parseExecutionContext( ecStr, prog );
+
+ //handle program blocks
+ String spbs = st.nextToken();
+ ArrayList<ProgramBlock> pbs = rParseProgramBlocks(spbs, prog, id);
+
+ body.setChildBlocks( pbs );
+ body.setEc( ec );
+
+ return body;
+ }
+
+ private static void handleDMLConfig(String confStr) {
+ if(confStr != null && !confStr.trim().isEmpty()) {
+ DMLConfig dmlconf = DMLConfig.parseDMLConfig(confStr);
+ CompilerConfig cconf = OptimizerUtils.constructCompilerConfig(dmlconf);
+ ConfigurationManager.setLocalConfig(dmlconf);
+ ConfigurationManager.setLocalConfig(cconf);
+ }
+ }
+
+ public static Program parseProgram( String in, int id ) {
+ String lin = in.substring( PROG_BEGIN.length(),in.length()- PROG_END.length()).trim();
+ Program prog = new Program();
+ HashMap<String,FunctionProgramBlock> fc = parseFunctionProgramBlocks(lin, prog, id);
+ for( Entry<String,FunctionProgramBlock> e : fc.entrySet() ) {
+ String[] keypart = e.getKey().split( Program.KEY_DELIM );
+ String namespace = keypart[0];
+ String name = keypart[1];
+ prog.addFunctionProgramBlock(namespace, name, e.getValue());
+ }
+ return prog;
+ }
+
+ private static LocalVariableMap parseVariables(String in) {
+ LocalVariableMap ret = null;
+ if( in.length()> VARS_BEGIN.length() + VARS_END.length()) {
+ String varStr = in.substring( VARS_BEGIN.length(),in.length()- VARS_END.length()).trim();
+ ret = LocalVariableMap.deserialize(varStr);
+ }
+ else { //empty input symbol table
+ ret = new LocalVariableMap();
+ }
+ return ret;
+ }
+
+ private static HashMap<String,FunctionProgramBlock> parseFunctionProgramBlocks( String in, Program prog, int id ) {
+ HashMap<String,FunctionProgramBlock> ret = new HashMap<>();
+ HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer( in, ELEMENT_DELIM );
+ while( st.hasMoreTokens() ) {
+ String lvar = st.nextToken(); //with ID = CP_CHILD_THREAD+id for current use
+ //put first copy into prog (for direct use)
+ int index = lvar.indexOf( KEY_VALUE_DELIM );
+ String tmp1 = lvar.substring(0, index); // + CP_CHILD_THREAD+id;
+ String tmp2 = lvar.substring(index + 1);
+ ret.put(tmp1, (FunctionProgramBlock)rParseProgramBlock(tmp2, prog, id));
+ }
+ return ret;
+ }
+
+ private static ArrayList<ProgramBlock> rParseProgramBlocks(String in, Program prog, int id) {
+ ArrayList<ProgramBlock> pbs = new ArrayList<>();
+ String tmpdata = in.substring(PBS_BEGIN.length(),in.length()- PBS_END.length());
+ HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(tmpdata, ELEMENT_DELIM);
+ while( st.hasMoreTokens() )
+ pbs.add( rParseProgramBlock( st.nextToken(), prog, id ) );
+ return pbs;
+ }
+
+ private static ProgramBlock rParseProgramBlock( String in, Program prog, int id ) {
+ ProgramBlock pb = null;
+ if( in.startsWith(PB_WHILE) )
+ pb = rParseWhileProgramBlock( in, prog, id );
+ else if ( in.startsWith(PB_FOR) )
+ pb = rParseForProgramBlock( in, prog, id );
+ else if ( in.startsWith(PB_PARFOR) )
+ pb = rParseParForProgramBlock( in, prog, id );
+ else if ( in.startsWith(PB_IF) )
+ pb = rParseIfProgramBlock( in, prog, id );
+ else if ( in.startsWith(PB_FC) )
+ pb = rParseFunctionProgramBlock( in, prog, id );
+ else if ( in.startsWith(PB_EFC) )
+ pb = rParseExternalFunctionProgramBlock( in, prog, id );
+ else if ( in.startsWith(PB_BEGIN) )
+ pb = rParseGenericProgramBlock( in, prog, id );
+ else
+ throw new DMLRuntimeException( NOT_SUPPORTED_PB+" "+in );
+ return pb;
+ }
+
+ private static WhileProgramBlock rParseWhileProgramBlock( String in, Program prog, int id ) {
+ String lin = in.substring( PB_WHILE.length(),in.length()- PB_END.length());
+ HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
+
+ //predicate instructions
+ ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id);
+
+ //exit instructions
+ ArrayList<Instruction> exit = parseInstructions(st.nextToken(),id);
+
+ //program blocks
+ ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id);
+
+ WhileProgramBlock wpb = new WhileProgramBlock(prog,inst);
+ wpb.setExitInstructions2(exit);
+ wpb.setChildBlocks(pbs);
+
+ return wpb;
+ }
+
+ private static ForProgramBlock rParseForProgramBlock( String in, Program prog, int id ) {
+ String lin = in.substring( PB_FOR.length(),in.length()- PB_END.length());
+ HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
+
+ //inputs
+ String iterVar = st.nextToken();
+
+ //instructions
+ ArrayList<Instruction> from = parseInstructions(st.nextToken(),id);
+ ArrayList<Instruction> to = parseInstructions(st.nextToken(),id);
+ ArrayList<Instruction> incr = parseInstructions(st.nextToken(),id);
+
+ //exit instructions
+ ArrayList<Instruction> exit = parseInstructions(st.nextToken(),id);
+
+ //program blocks
+ ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id);
+
+ ForProgramBlock fpb = new ForProgramBlock(prog, iterVar);
+ fpb.setFromInstructions(from);
+ fpb.setToInstructions(to);
+ fpb.setIncrementInstructions(incr);
+ fpb.setExitInstructions(exit);
+ fpb.setChildBlocks(pbs);
+
+ return fpb;
+ }
+
+ private static ParForProgramBlock rParseParForProgramBlock( String in, Program prog, int id ) {
+ String lin = in.substring( PB_PARFOR.length(),in.length()- PB_END.length());
+ HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
+
+ //inputs
+ String iterVar = st.nextToken();
+ ArrayList<ResultVar> resultVars = parseResultVariables(st.nextToken());
+ HashMap<String,String> params = parseStringHashMap(st.nextToken());
+
+ //instructions
+ ArrayList<Instruction> from = parseInstructions(st.nextToken(), 0);
+ ArrayList<Instruction> to = parseInstructions(st.nextToken(), 0);
+ ArrayList<Instruction> incr = parseInstructions(st.nextToken(), 0);
+
+ //exit instructions
+ ArrayList<Instruction> exit = parseInstructions(st.nextToken(), 0);
+
+ //program blocks //reset id to preinit state, replaced during exec
+ ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, 0);
+
+ ParForProgramBlock pfpb = new ParForProgramBlock(id, prog, iterVar, params, resultVars);
+ pfpb.disableOptimization(); //already done in top-level parfor
+ pfpb.setFromInstructions(from);
+ pfpb.setToInstructions(to);
+ pfpb.setIncrementInstructions(incr);
+ pfpb.setExitInstructions(exit);
+ pfpb.setChildBlocks(pbs);
+
+ return pfpb;
+ }
+
+ private static IfProgramBlock rParseIfProgramBlock( String in, Program prog, int id ) {
+ String lin = in.substring( PB_IF.length(),in.length()- PB_END.length());
+ HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
+
+ //predicate instructions
+ ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id);
+
+ //exit instructions
+ ArrayList<Instruction> exit = parseInstructions(st.nextToken(),id);
+
+ //program blocks: if and else
+ ArrayList<ProgramBlock> pbs1 = rParseProgramBlocks(st.nextToken(), prog, id);
+ ArrayList<ProgramBlock> pbs2 = rParseProgramBlocks(st.nextToken(), prog, id);
+
+ IfProgramBlock ipb = new IfProgramBlock(prog,inst);
+ ipb.setExitInstructions2(exit);
+ ipb.setChildBlocksIfBody(pbs1);
+ ipb.setChildBlocksElseBody(pbs2);
+
+ return ipb;
+ }
+
+ private static FunctionProgramBlock rParseFunctionProgramBlock( String in, Program prog, int id ) {
+ String lin = in.substring( PB_FC.length(),in.length()- PB_END.length());
+ HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
+
+ //inputs and outputs
+ ArrayList<DataIdentifier> dat1 = parseDataIdentifiers(st.nextToken());
+ ArrayList<DataIdentifier> dat2 = parseDataIdentifiers(st.nextToken());
+
+ //instructions
+ ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id);
+
+ //program blocks
+ ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id);
+
+ ArrayList<DataIdentifier> tmp1 = new ArrayList<>(dat1);
+ ArrayList<DataIdentifier> tmp2 = new ArrayList<>(dat2);
+ FunctionProgramBlock fpb = new FunctionProgramBlock(prog, tmp1, tmp2);
+ fpb.setInstructions(inst);
+ fpb.setChildBlocks(pbs);
+
+ return fpb;
+ }
+
+ private static ExternalFunctionProgramBlock rParseExternalFunctionProgramBlock( String in, Program prog, int id ) {
+ String lin = in.substring( PB_EFC.length(),in.length()- PB_END.length());
+ HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
+
+ //inputs, outputs and params
+ ArrayList<DataIdentifier> dat1 = parseDataIdentifiers(st.nextToken());
+ ArrayList<DataIdentifier> dat2 = parseDataIdentifiers(st.nextToken());
+ HashMap<String,String> dat3 = parseStringHashMap(st.nextToken());
+
+ //basedir
+ String basedir = st.nextToken();
+
+ //instructions (required for removing INST BEGIN, END)
+ parseInstructions(st.nextToken(),id);
+
+ //program blocks
+ ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id);
+
+ ArrayList<DataIdentifier> tmp1 = new ArrayList<>(dat1);
+ ArrayList<DataIdentifier> tmp2 = new ArrayList<>(dat2);
+
+ //only CP external functions, because no nested MR jobs for reblocks
+ ExternalFunctionProgramBlockCP efpb = new ExternalFunctionProgramBlockCP(prog, tmp1, tmp2, dat3, basedir);
+ efpb.setChildBlocks(pbs);
+
+ return efpb;
+ }
+
+ private static ProgramBlock rParseGenericProgramBlock( String in, Program prog, int id ) {
+ String lin = in.substring( PB_BEGIN.length(),in.length()- PB_END.length());
+ StringTokenizer st = new StringTokenizer(lin,COMPONENTS_DELIM);
+ ProgramBlock pb = new ProgramBlock(prog);
+ pb.setInstructions(parseInstructions(st.nextToken(),id));
+ return pb;
+ }
+
+ private static ArrayList<Instruction> parseInstructions( String in, int id ) {
+ ArrayList<Instruction> insts = new ArrayList<>();
+ String lin = in.substring( INST_BEGIN.length(),in.length()- INST_END.length());
+ StringTokenizer st = new StringTokenizer(lin, ELEMENT_DELIM);
+ while(st.hasMoreTokens()) {
+ //Note that at this point only CP instructions and External function instruction can occur
+ String instStr = st.nextToken();
+ try {
+ Instruction tmpinst = CPInstructionParser.parseSingleInstruction(instStr);
+ tmpinst = saveReplaceThreadID(tmpinst, Lop.CP_ROOT_THREAD_ID, Lop.CP_CHILD_THREAD+id );
+ insts.add( tmpinst );
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException("Failed to parse instruction: " + instStr, ex);
+ }
+ }
+ return insts;
+ }
+
+ private static ArrayList<ResultVar> parseResultVariables(String in) {
+ ArrayList<ResultVar> ret = new ArrayList<>();
+ for(String var : parseStringArrayList(in)) {
+ boolean accum = var.endsWith("+");
+ ret.add(new ResultVar(accum ? var.substring(0, var.length()-1) : var, accum));
+ }
+ return ret;
+ }
+
+ private static HashMap<String,String> parseStringHashMap( String in ) {
+ HashMap<String,String> vars = new HashMap<>();
+ StringTokenizer st = new StringTokenizer(in,ELEMENT_DELIM);
+ while( st.hasMoreTokens() ) {
+ String lin = st.nextToken();
+ int index = lin.indexOf( KEY_VALUE_DELIM );
+ String tmp1 = lin.substring(0, index);
+ String tmp2 = lin.substring(index + 1);
+ vars.put(tmp1, tmp2);
+ }
+ return vars;
+ }
+
+ private static ArrayList<String> parseStringArrayList(String in) {
+ return parseStringArrayList(in, ELEMENT_DELIM);
+ }
+
+ private static ArrayList<String> parseStringArrayList(String in, String delim) {
+ StringTokenizer st = new StringTokenizer(in, delim);
+ ArrayList<String> vars = new ArrayList<>(st.countTokens());
+ while( st.hasMoreTokens() )
+ vars.add(st.nextToken());
+ return vars;
+ }
+
+ private static ArrayList<DataIdentifier> parseDataIdentifiers( String in ) {
+ ArrayList<DataIdentifier> vars = new ArrayList<>();
+ StringTokenizer st = new StringTokenizer(in, ELEMENT_DELIM);
+ while( st.hasMoreTokens() ) {
+ String tmp = st.nextToken();
+ DataIdentifier dat = parseDataIdentifier( tmp );
+ vars.add(dat);
+ }
+ return vars;
+ }
+
+ private static DataIdentifier parseDataIdentifier( String in ) {
+ StringTokenizer st = new StringTokenizer(in, DATA_FIELD_DELIM);
+ DataIdentifier dat = new DataIdentifier(st.nextToken());
+ dat.setDataType(DataType.valueOf(st.nextToken()));
+ dat.setValueType(ValueType.valueOf(st.nextToken()));
+ return dat;
+ }
+
+ /**
+ * NOTE: MRJobConfiguration cannot be used for the general case because program blocks and
+ * related symbol tables can be hierarchically structured.
+ *
+ * @param in data object as string
+ * @return array of objects
+ */
+ public static Object[] parseDataObject(String in) {
+ Object[] ret = new Object[2];
+
+ StringTokenizer st = new StringTokenizer(in, DATA_FIELD_DELIM );
+ String name = st.nextToken();
+ DataType datatype = DataType.valueOf( st.nextToken() );
+ ValueType valuetype = ValueType.valueOf( st.nextToken() );
+ String valString = st.hasMoreTokens() ? st.nextToken() : "";
+ Data dat = null;
+ switch( datatype )
+ {
+ case SCALAR: {
+ switch ( valuetype ) {
+ case INT: dat = new IntObject(Long.parseLong(valString)); break;
+ case DOUBLE: dat = new DoubleObject(Double.parseDouble(valString)); break;
+ case BOOLEAN: dat = new BooleanObject(Boolean.parseBoolean(valString)); break;
+ case STRING: dat = new StringObject(valString); break;
+ default:
+ throw new DMLRuntimeException("Unable to parse valuetype "+valuetype);
+ }
+ break;
+ }
+ case MATRIX: {
+ MatrixObject mo = new MatrixObject(valuetype,valString);
+ long rows = Long.parseLong( st.nextToken() );
+ long cols = Long.parseLong( st.nextToken() );
+ int brows = Integer.parseInt( st.nextToken() );
+ int bcols = Integer.parseInt( st.nextToken() );
+ long nnz = Long.parseLong( st.nextToken() );
+ InputInfo iin = InputInfo.stringToInputInfo( st.nextToken() );
+ OutputInfo oin = OutputInfo.stringToOutputInfo( st.nextToken() );
+ PartitionFormat partFormat = PartitionFormat.valueOf( st.nextToken() );
+ UpdateType inplace = UpdateType.valueOf( st.nextToken() );
+ MatrixCharacteristics mc = new MatrixCharacteristics(rows, cols, brows, bcols, nnz);
+ MetaDataFormat md = new MetaDataFormat( mc, oin, iin );
+ mo.setMetaData( md );
+ if( partFormat._dpf != PDataPartitionFormat.NONE )
+ mo.setPartitioned( partFormat._dpf, partFormat._N );
+ mo.setUpdateType(inplace);
+ dat = mo;
+ break;
+ }
+ case LIST:
+ int size = Integer.parseInt(st.nextToken());
+ String namesStr = st.nextToken();
+ List<String> names = namesStr.equals(EMPTY) ? null :
+ parseStringArrayList(namesStr, ELEMENT_DELIM2);
+ List<Data> data = new ArrayList<>(size);
+ st.nextToken(LIST_ELEMENT_DELIM);
+ for (int i = 0; i < size; i++) {
+ String dataStr = st.nextToken();
+ Object[] obj = parseDataObject(dataStr);
+ data.add((Data) obj[1]);
+ }
+ dat = new ListObject(data, names);
+ break;
+ default:
+ throw new DMLRuntimeException("Unable to parse datatype "+datatype);
+ }
+
+ ret[0] = name;
+ ret[1] = dat;
+ return ret;
+ }
+
+ private static ExecutionContext parseExecutionContext(String in, Program prog) {
+ ExecutionContext ec = null;
+ String lin = in.substring(EC_BEGIN.length(),in.length()- EC_END.length()).trim();
+ if( !lin.equals( EMPTY ) ) {
+ LocalVariableMap vars = parseVariables(lin);
+ ec = ExecutionContextFactory.createContext( false, prog );
+ ec.setVariables(vars);
+ }
+ return ec;
+ }
+
+ private static void parseAndSetAdditionalConfigurations(String conf) {
+ String[] statsFlag = conf.split("=");
+ DMLScript.STATISTICS = Boolean.parseBoolean(statsFlag[1]);
+ }
+
+ //////////
+ // CUSTOM SAFE LITERAL REPLACEMENT
+
+ /**
+ * In-place replacement of thread ids in filenames, functions names etc
+ *
+ * @param inst instruction
+ * @param pattern ?
+ * @param replacement string replacement
+ * @return instruction
+ */
+ private static Instruction saveReplaceThreadID( Instruction inst, String pattern, String replacement ) {
+ //currently known, relevant instructions: createvar, rand, seq, extfunct,
+ if( inst instanceof MRJobInstruction ) {
+ //update dims file, and internal string representations of rand/seq instructions
+ MRJobInstruction mrinst = (MRJobInstruction)inst;
+ mrinst.updateInstructionThreadID(pattern, replacement);
+ }
+ else if ( inst instanceof VariableCPInstruction ) { //createvar, setfilename
+ //update in-memory representation
+ inst.updateInstructionThreadID(pattern, replacement);
+ }
+ //NOTE> //Rand, seq in CP not required
+ return inst;
+ }
+
+ public static String saveReplaceFilenameThreadID(String fname, String pattern, String replace) {
+ //save replace necessary in order to account for the possibility that read variables have our prefix in the absolute path
+ //replace the last match only, because (1) we have at most one _t0 and (2) always concatenated to the end.
+ int pos = fname.lastIndexOf(pattern);
+ return ( pos < 0 ) ? fname : fname.substring(0, pos)
+ + replace + fname.substring(pos+pattern.length());
+ }
+
+
+ //////////
+ // CUSTOM HIERARCHICAL TOKENIZER
+
+
+ /**
+ * Custom StringTokenizer for splitting strings of hierarchies. The basic idea is to
+ * search for delim-Strings on the same hierarchy level, while delims of lower hierarchy
+ * levels are skipped.
+ *
+ */
+ private static class HierarchyAwareStringTokenizer //extends StringTokenizer
+ {
+ private String _str = null;
+ private String _del = null;
+ private int _off = -1;
+
+ public HierarchyAwareStringTokenizer( String in, String delim ) {
+ //super(in);
+ _str = in;
+ _del = delim;
+ _off = delim.length();
+ }
+
+ public boolean hasMoreTokens() {
+ return (_str.length() > 0);
+ }
+
+ public String nextToken() {
+ int nextDelim = determineNextSameLevelIndexOf(_str, _del);
+ String token = null;
+ if(nextDelim < 0) {
+ nextDelim = _str.length();
+ _off = 0;
+ }
+ token = _str.substring(0,nextDelim);
+ _str = _str.substring( nextDelim + _off );
+ return token;
+ }
+
+ private static int determineNextSameLevelIndexOf( String data, String pattern )
+ {
+ String tmpdata = data;
+ int index = 0;
+ int count = 0;
+ int off=0,i1,i2,i3,min;
+
+ while(true) {
+ i1 = tmpdata.indexOf(pattern);
+ i2 = tmpdata.indexOf(LEVELIN);
+ i3 = tmpdata.indexOf(LEVELOUT);
+
+ if( i1 < 0 ) return i1; //no pattern found at all
+
+ min = i1; //min >= 0 by definition
+ if( i2 >= 0 ) min = Math.min(min, i2);
+ if( i3 >= 0 ) min = Math.min(min, i3);
+
+ //stack maintenance
+ if( i1 == min && count == 0 )
+ return index+i1;
+ else if( i2 == min ) {
+ count++;
+ off = LEVELIN.length();
+ }
+ else if( i3 == min ) {
+ count--;
+ off = LEVELOUT.length();
+ }
+
+ //prune investigated string
+ index += min+off;
+ tmpdata = tmpdata.substring(min+off);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservLocalNNTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservLocalNNTest.java b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservLocalNNTest.java
new file mode 100644
index 0000000..d5fd509
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservLocalNNTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.paramserv;
+
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.junit.Test;
+
+public class ParamservLocalNNTest extends AutomatedTestBase {
+
+ private static final String TEST_NAME1 = "paramserv-nn-bsp-batch-dc";
+ private static final String TEST_NAME2 = "paramserv-nn-asp-batch";
+ private static final String TEST_NAME3 = "paramserv-nn-bsp-epoch";
+ private static final String TEST_NAME4 = "paramserv-nn-asp-epoch";
+ private static final String TEST_NAME5 = "paramserv-nn-bsp-batch-drr";
+ private static final String TEST_NAME6 = "paramserv-nn-bsp-batch-dr";
+ private static final String TEST_NAME7 = "paramserv-nn-bsp-batch-or";
+
+ private static final String TEST_DIR = "functions/paramserv/";
+ private static final String TEST_CLASS_DIR = TEST_DIR + ParamservLocalNNTest.class.getSimpleName() + "/";
+
+ @Override
+ public void setUp() {
+ addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {}));
+ addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {}));
+ addTestConfiguration(TEST_NAME3, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] {}));
+ addTestConfiguration(TEST_NAME4, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME4, new String[] {}));
+ addTestConfiguration(TEST_NAME5, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME5, new String[] {}));
+ addTestConfiguration(TEST_NAME6, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME6, new String[] {}));
+ addTestConfiguration(TEST_NAME7, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME7, new String[] {}));
+ }
+
+ @Test
+ public void testParamservBSPBatchDisjointContiguous() {
+ runDMLTest(TEST_NAME1);
+ }
+
+ @Test
+ public void testParamservASPBatch() {
+ runDMLTest(TEST_NAME2);
+ }
+
+ @Test
+ public void testParamservBSPEpoch() {
+ runDMLTest(TEST_NAME3);
+ }
+
+ @Test
+ public void testParamservASPEpoch() {
+ runDMLTest(TEST_NAME4);
+ }
+
+ @Test
+ public void testParamservBSPBatchDisjointRoundRobin() {
+ runDMLTest(TEST_NAME5);
+ }
+
+ @Test
+ public void testParamservBSPBatchDisjointRandom() {
+ runDMLTest(TEST_NAME6);
+ }
+
+ @Test
+ public void testParamservBSPBatchOverlapReshuffle() {
+ runDMLTest(TEST_NAME7);
+ }
+
+ private void runDMLTest(String testname) {
+ TestConfiguration config = getTestConfiguration(testname);
+ loadTestConfiguration(config);
+ programArgs = new String[] { "-explain" };
+ String HOME = SCRIPT_DIR + TEST_DIR;
+ fullDMLScriptName = HOME + testname + ".dml";
+ runTest(true, false, null, null, -1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservNNTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservNNTest.java b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservNNTest.java
deleted file mode 100644
index d7afc9d..0000000
--- a/src/test/java/org/apache/sysml/test/integration/functions/paramserv/ParamservNNTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.test.integration.functions.paramserv;
-
-import org.apache.sysml.test.integration.AutomatedTestBase;
-import org.apache.sysml.test.integration.TestConfiguration;
-import org.junit.Test;
-
-public class ParamservNNTest extends AutomatedTestBase {
-
- private static final String TEST_NAME1 = "paramserv-nn-bsp-batch-dc";
- private static final String TEST_NAME2 = "paramserv-nn-asp-batch";
- private static final String TEST_NAME3 = "paramserv-nn-bsp-epoch";
- private static final String TEST_NAME4 = "paramserv-nn-asp-epoch";
- private static final String TEST_NAME5 = "paramserv-nn-bsp-batch-drr";
- private static final String TEST_NAME6 = "paramserv-nn-bsp-batch-dr";
- private static final String TEST_NAME7 = "paramserv-nn-bsp-batch-or";
-
- private static final String TEST_DIR = "functions/paramserv/";
- private static final String TEST_CLASS_DIR = TEST_DIR + ParamservNNTest.class.getSimpleName() + "/";
-
- private final String HOME = SCRIPT_DIR + TEST_DIR;
-
- @Override
- public void setUp() {
- addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {}));
- addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {}));
- addTestConfiguration(TEST_NAME3, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] {}));
- addTestConfiguration(TEST_NAME4, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME4, new String[] {}));
- addTestConfiguration(TEST_NAME5, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME5, new String[] {}));
- addTestConfiguration(TEST_NAME6, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME6, new String[] {}));
- addTestConfiguration(TEST_NAME7, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME7, new String[] {}));
- }
-
- @Test
- public void testParamservBSPBatchDisjointContiguous() {
- runDMLTest(TEST_NAME1);
- }
-
- @Test
- public void testParamservASPBatch() {
- runDMLTest(TEST_NAME2);
- }
-
- @Test
- public void testParamservBSPEpoch() {
- runDMLTest(TEST_NAME3);
- }
-
- @Test
- public void testParamservASPEpoch() {
- runDMLTest(TEST_NAME4);
- }
-
- @Test
- public void testParamservBSPBatchDisjointRoundRobin() {
- runDMLTest(TEST_NAME5);
- }
-
- @Test
- public void testParamservBSPBatchDisjointRandom() {
- runDMLTest(TEST_NAME6);
- }
-
- @Test
- public void testParamservBSPBatchOverlapReshuffle() {
- runDMLTest(TEST_NAME7);
- }
-
- private void runDMLTest(String testname) {
- TestConfiguration config = getTestConfiguration(testname);
- loadTestConfiguration(config);
- programArgs = new String[] { "-explain" };
- fullDMLScriptName = HOME + testname + ".dml";
- runTest(true, false, null, null, -1);
- }
-}
[3/4] systemml git commit: [SYSTEMML-2419] Paramserv spark function
shipping and worker setup
Posted by mb...@apache.org.
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
deleted file mode 100644
index 919b357..0000000
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
+++ /dev/null
@@ -1,1866 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.controlprogram.parfor;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.StringTokenizer;
-import java.util.Map.Entry;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.sysml.api.DMLScript;
-import org.apache.sysml.conf.CompilerConfig.ConfigType;
-import org.apache.sysml.conf.CompilerConfig;
-import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.conf.DMLConfig;
-import org.apache.sysml.hops.Hop;
-import org.apache.sysml.hops.OptimizerUtils;
-import org.apache.sysml.hops.recompile.Recompiler;
-import org.apache.sysml.parser.DMLProgram;
-import org.apache.sysml.parser.DataIdentifier;
-import org.apache.sysml.parser.ForStatementBlock;
-import org.apache.sysml.parser.IfStatementBlock;
-import org.apache.sysml.parser.ParForStatementBlock;
-import org.apache.sysml.parser.StatementBlock;
-import org.apache.sysml.parser.Expression.DataType;
-import org.apache.sysml.parser.Expression.ValueType;
-import org.apache.sysml.parser.ParForStatementBlock.ResultVar;
-import org.apache.sysml.parser.WhileStatementBlock;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.codegen.CodegenUtils;
-import org.apache.sysml.runtime.controlprogram.ExternalFunctionProgramBlock;
-import org.apache.sysml.runtime.controlprogram.ExternalFunctionProgramBlockCP;
-import org.apache.sysml.runtime.controlprogram.ForProgramBlock;
-import org.apache.sysml.runtime.controlprogram.FunctionProgramBlock;
-import org.apache.sysml.runtime.controlprogram.IfProgramBlock;
-import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
-import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
-import org.apache.sysml.runtime.controlprogram.Program;
-import org.apache.sysml.runtime.controlprogram.ProgramBlock;
-import org.apache.sysml.runtime.controlprogram.WhileProgramBlock;
-import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
-import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PExecMode;
-import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
-import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
-import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.instructions.CPInstructionParser;
-import org.apache.sysml.runtime.instructions.Instruction;
-import org.apache.sysml.runtime.instructions.InstructionParser;
-import org.apache.sysml.runtime.instructions.MRJobInstruction;
-import org.apache.sysml.runtime.instructions.cp.BooleanObject;
-import org.apache.sysml.runtime.instructions.cp.CPInstruction;
-import org.apache.sysml.runtime.instructions.cp.Data;
-import org.apache.sysml.runtime.instructions.cp.DoubleObject;
-import org.apache.sysml.runtime.instructions.cp.FunctionCallCPInstruction;
-import org.apache.sysml.runtime.instructions.cp.IntObject;
-import org.apache.sysml.runtime.instructions.cp.ScalarObject;
-import org.apache.sysml.runtime.instructions.cp.SpoofCPInstruction;
-import org.apache.sysml.runtime.instructions.cp.StringObject;
-import org.apache.sysml.runtime.instructions.cp.VariableCPInstruction;
-import org.apache.sysml.runtime.instructions.gpu.GPUInstruction;
-import org.apache.sysml.runtime.instructions.mr.MRInstruction;
-import org.apache.sysml.runtime.instructions.spark.SPInstruction;
-import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
-import org.apache.sysml.runtime.matrix.MetaDataFormat;
-import org.apache.sysml.runtime.matrix.data.InputInfo;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.OutputInfo;
-import org.apache.sysml.udf.ExternalFunctionInvocationInstruction;
-
-/**
- * Program converter functionalities for
- * (1) creating deep copies of program blocks, instructions, function program blocks, and
- * (2) serializing and parsing of programs, program blocks, functions program blocks.
- *
- */
-//TODO: rewrite class to instance-based invocation (grown gradually and now inappropriate design)
-public class ProgramConverter
-{
- protected static final Log LOG = LogFactory.getLog(ProgramConverter.class.getName());
-
- //use escaped unicodes for separators in order to prevent string conflict
- public static final String NEWLINE = "\n"; //System.lineSeparator();
- public static final String COMPONENTS_DELIM = "\u236e"; //semicolon w/ bar; ";";
- public static final String ELEMENT_DELIM = "\u236a"; //comma w/ bar; ",";
- public static final String DATA_FIELD_DELIM = "\u007c"; //"|";
- public static final String KEY_VALUE_DELIM = "\u003d"; //"=";
- public static final String LEVELIN = "\u23a8"; //variant of left curly bracket; "\u007b"; //"{";
- public static final String LEVELOUT = "\u23ac"; //variant of right curly bracket; "\u007d"; //"}";
- public static final String EMPTY = "null";
-
- //public static final String CP_ROOT_THREAD_SEPARATOR = "/";//File.separator;
- public static final String CP_ROOT_THREAD_ID = "_t0";
- public static final String CP_CHILD_THREAD = "_t";
-
- public static final String PARFOR_CDATA_BEGIN = "<![CDATA[";
- public static final String PARFOR_CDATA_END = " ]]>";
-
- public static final String PARFOR_PROG_BEGIN = " PROG" + LEVELIN;
- public static final String PARFOR_PROG_END = LEVELOUT;
- public static final String PARFORBODY_BEGIN = PARFOR_CDATA_BEGIN+"PARFORBODY" + LEVELIN;
- public static final String PARFORBODY_END = LEVELOUT+PARFOR_CDATA_END;
- public static final String PARFOR_VARS_BEGIN = "VARS: ";
- public static final String PARFOR_VARS_END = "";
- public static final String PARFOR_PBS_BEGIN = " PBS" + LEVELIN;
- public static final String PARFOR_PBS_END = LEVELOUT;
- public static final String PARFOR_INST_BEGIN = " INST: ";
- public static final String PARFOR_INST_END = "";
- public static final String PARFOR_EC_BEGIN = " EC: ";
- public static final String PARFOR_EC_END = "";
- public static final String PARFOR_PB_BEGIN = " PB" + LEVELIN;
- public static final String PARFOR_PB_END = LEVELOUT;
- public static final String PARFOR_PB_WHILE = " WHILE" + LEVELIN;
- public static final String PARFOR_PB_FOR = " FOR" + LEVELIN;
- public static final String PARFOR_PB_PARFOR = " PARFOR" + LEVELIN;
- public static final String PARFOR_PB_IF = " IF" + LEVELIN;
- public static final String PARFOR_PB_FC = " FC" + LEVELIN;
- public static final String PARFOR_PB_EFC = " EFC" + LEVELIN;
-
- public static final String PARFOR_CONF_STATS = "stats";
-
-
- //exception msgs
- public static final String NOT_SUPPORTED_EXTERNALFUNCTION_PB = "Not supported: ExternalFunctionProgramBlock contains MR instructions. " +
- "(ExternalFunctionPRogramBlockCP can be used)";
- public static final String NOT_SUPPORTED_MR_INSTRUCTION = "Not supported: Instructions of type other than CP instructions";
- public static final String NOT_SUPPORTED_MR_PARFOR = "Not supported: Nested ParFOR REMOTE_MR due to possible deadlocks." +
- "(LOCAL can be used for innner ParFOR)";
- public static final String NOT_SUPPORTED_PB = "Not supported: type of program block";
-
- ////////////////////////////////
- // CREATION of DEEP COPIES
- ////////////////////////////////
-
- /**
- * Creates a deep copy of the given execution context.
- * For rt_platform=Hadoop, execution context has a symbol table.
- *
- * @param ec execution context
- * @return execution context
- * @throws CloneNotSupportedException if CloneNotSupportedException occurs
- */
- public static ExecutionContext createDeepCopyExecutionContext(ExecutionContext ec)
- throws CloneNotSupportedException
- {
- ExecutionContext cpec = ExecutionContextFactory.createContext(false, ec.getProgram());
- cpec.setVariables((LocalVariableMap) ec.getVariables().clone());
-
- //handle result variables with in-place update flag
- //(each worker requires its own copy of the empty matrix object)
- for( String var : cpec.getVariables().keySet() ) {
- Data dat = cpec.getVariables().get(var);
- if( dat instanceof MatrixObject && ((MatrixObject)dat).getUpdateType().isInPlace() ) {
- MatrixObject mo = (MatrixObject)dat;
- MatrixObject moNew = new MatrixObject(mo);
- if( mo.getNnz() != 0 ){
- // If output matrix is not empty (NNZ != 0), then local copy is created so that
- // update in place operation can be applied.
- MatrixBlock mbVar = mo.acquireRead();
- moNew.acquireModify (new MatrixBlock(mbVar));
- mo.release();
- } else {
- //create empty matrix block w/ dense representation (preferred for update in-place)
- //Creating a dense matrix block is valid because empty block not allocated and transfer
- // to sparse representation happens in left indexing in place operation.
- moNew.acquireModify(new MatrixBlock((int)mo.getNumRows(), (int)mo.getNumColumns(), false));
- }
- moNew.release();
- cpec.setVariable(var, moNew);
- }
- }
-
- return cpec;
- }
-
- /**
- * This recursively creates a deep copy of program blocks and transparently replaces filenames according to the
- * specified parallel worker in order to avoid conflicts between parworkers. This happens recursively in order
- * to support arbitrary control-flow constructs within a parfor.
- *
- * @param childBlocks child program blocks
- * @param pid ?
- * @param IDPrefix ?
- * @param fnStack ?
- * @param fnCreated ?
- * @param plain if true, full deep copy without id replacement
- * @param forceDeepCopy if true, force deep copy
- * @return list of program blocks
- */
- public static ArrayList<ProgramBlock> rcreateDeepCopyProgramBlocks(ArrayList<ProgramBlock> childBlocks, long pid, int IDPrefix, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy)
- {
- ArrayList<ProgramBlock> tmp = new ArrayList<>();
-
- for( ProgramBlock pb : childBlocks )
- {
- Program prog = pb.getProgram();
- ProgramBlock tmpPB = null;
-
- if( pb instanceof WhileProgramBlock )
- {
- tmpPB = createDeepCopyWhileProgramBlock((WhileProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy);
- }
- else if( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock) )
- {
- tmpPB = createDeepCopyForProgramBlock((ForProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy );
- }
- else if( pb instanceof ParForProgramBlock )
- {
- ParForProgramBlock pfpb = (ParForProgramBlock) pb;
- if( ParForProgramBlock.ALLOW_NESTED_PARALLELISM )
- tmpPB = createDeepCopyParForProgramBlock(pfpb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy);
- else
- tmpPB = createDeepCopyForProgramBlock((ForProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy);
- }
- else if( pb instanceof IfProgramBlock )
- {
- tmpPB = createDeepCopyIfProgramBlock((IfProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy);
- }
- else //last-level program block
- {
- tmpPB = new ProgramBlock(prog); // general case use for most PBs
-
- //for recompile in the master node JVM
- tmpPB.setStatementBlock(createStatementBlockCopy(pb.getStatementBlock(), pid, plain, forceDeepCopy));
- //tmpPB.setStatementBlock(pb.getStatementBlock());
- tmpPB.setThreadID(pid);
- }
-
- //copy instructions
- tmpPB.setInstructions( createDeepCopyInstructionSet(pb.getInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
-
- //copy symbol table
- //tmpPB.setVariables( pb.getVariables() ); //implicit cloning
-
- tmp.add(tmpPB);
- }
-
- return tmp;
- }
-
- public static WhileProgramBlock createDeepCopyWhileProgramBlock(WhileProgramBlock wpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) {
- ArrayList<Instruction> predinst = createDeepCopyInstructionSet(wpb.getPredicate(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true);
- WhileProgramBlock tmpPB = new WhileProgramBlock(prog, predinst);
- tmpPB.setStatementBlock( createWhileStatementBlockCopy((WhileStatementBlock) wpb.getStatementBlock(), pid, plain, forceDeepCopy) );
- tmpPB.setThreadID(pid);
-
- tmpPB.setExitInstructions2( createDeepCopyInstructionSet(wpb.getExitInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true));
- tmpPB.setChildBlocks(rcreateDeepCopyProgramBlocks(wpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy));
-
- return tmpPB;
- }
-
- public static IfProgramBlock createDeepCopyIfProgramBlock(IfProgramBlock ipb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) {
- ArrayList<Instruction> predinst = createDeepCopyInstructionSet(ipb.getPredicate(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true);
- IfProgramBlock tmpPB = new IfProgramBlock(prog, predinst);
- tmpPB.setStatementBlock( createIfStatementBlockCopy((IfStatementBlock)ipb.getStatementBlock(), pid, plain, forceDeepCopy ) );
- tmpPB.setThreadID(pid);
-
- tmpPB.setExitInstructions2( createDeepCopyInstructionSet(ipb.getExitInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true));
- tmpPB.setChildBlocksIfBody(rcreateDeepCopyProgramBlocks(ipb.getChildBlocksIfBody(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy));
- tmpPB.setChildBlocksElseBody(rcreateDeepCopyProgramBlocks(ipb.getChildBlocksElseBody(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy));
-
- return tmpPB;
- }
-
- public static ForProgramBlock createDeepCopyForProgramBlock(ForProgramBlock fpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) {
- ForProgramBlock tmpPB = new ForProgramBlock(prog,fpb.getIterVar());
- tmpPB.setStatementBlock( createForStatementBlockCopy((ForStatementBlock)fpb.getStatementBlock(), pid, plain, forceDeepCopy));
- tmpPB.setThreadID(pid);
-
- tmpPB.setFromInstructions( createDeepCopyInstructionSet(fpb.getFromInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
- tmpPB.setToInstructions( createDeepCopyInstructionSet(fpb.getToInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
- tmpPB.setIncrementInstructions( createDeepCopyInstructionSet(fpb.getIncrementInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
- tmpPB.setExitInstructions( createDeepCopyInstructionSet(fpb.getExitInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
- tmpPB.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy) );
-
- return tmpPB;
- }
-
- public static ForProgramBlock createShallowCopyForProgramBlock(ForProgramBlock fpb, Program prog ) {
- ForProgramBlock tmpPB = new ForProgramBlock(prog,fpb.getIterVar());
-
- tmpPB.setFromInstructions( fpb.getFromInstructions() );
- tmpPB.setToInstructions( fpb.getToInstructions() );
- tmpPB.setIncrementInstructions( fpb.getIncrementInstructions() );
- tmpPB.setExitInstructions( fpb.getExitInstructions() );
- tmpPB.setChildBlocks( fpb.getChildBlocks() );
-
- return tmpPB;
- }
-
- public static ParForProgramBlock createDeepCopyParForProgramBlock(ParForProgramBlock pfpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) {
- ParForProgramBlock tmpPB = null;
-
- if( IDPrefix == -1 ) //still on master node
- tmpPB = new ParForProgramBlock(prog,pfpb.getIterVar(), pfpb.getParForParams(), pfpb.getResultVariables());
- else //child of remote ParWorker at any level
- tmpPB = new ParForProgramBlock(IDPrefix, prog, pfpb.getIterVar(), pfpb.getParForParams(), pfpb.getResultVariables());
-
- tmpPB.setStatementBlock( createForStatementBlockCopy( (ForStatementBlock) pfpb.getStatementBlock(), pid, plain, forceDeepCopy) );
- tmpPB.setThreadID(pid);
-
- tmpPB.disableOptimization(); //already done in top-level parfor
- tmpPB.disableMonitorReport(); //already done in top-level parfor
-
- tmpPB.setFromInstructions( createDeepCopyInstructionSet(pfpb.getFromInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
- tmpPB.setToInstructions( createDeepCopyInstructionSet(pfpb.getToInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
- tmpPB.setIncrementInstructions( createDeepCopyInstructionSet(pfpb.getIncrementInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
- tmpPB.setExitInstructions( createDeepCopyInstructionSet(pfpb.getExitInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
-
- //NOTE: Normally, no recursive copy because (1) copied on each execution in this PB anyway
- //and (2) leave placeholders as they are. However, if plain, an explicit deep copy is requested.
- if( plain || forceDeepCopy )
- tmpPB.setChildBlocks( rcreateDeepCopyProgramBlocks(pfpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy) );
- else
- tmpPB.setChildBlocks( pfpb.getChildBlocks() );
-
- return tmpPB;
- }
-
- /**
- * This creates a deep copy of a function program block. The central reference to singletons of function program blocks
- * poses the need for explicit copies in order to prevent conflicting writes of temporary variables (see ExternalFunctionProgramBlock.
- *
- * @param namespace function namespace
- * @param oldName ?
- * @param pid ?
- * @param IDPrefix ?
- * @param prog runtime program
- * @param fnStack ?
- * @param fnCreated ?
- * @param plain ?
- */
- public static void createDeepCopyFunctionProgramBlock(String namespace, String oldName, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain)
- {
- //fpb guaranteed to be non-null (checked inside getFunctionProgramBlock)
- FunctionProgramBlock fpb = prog.getFunctionProgramBlock(namespace, oldName);
- String fnameNew = (plain)? oldName :(oldName+CP_CHILD_THREAD+pid);
- String fnameNewKey = DMLProgram.constructFunctionKey(namespace,fnameNew);
-
- if( prog.getFunctionProgramBlocks().containsKey(fnameNewKey) )
- return; //prevent redundant deep copy if already existent
-
- //create deep copy
- FunctionProgramBlock copy = null;
- ArrayList<DataIdentifier> tmp1 = new ArrayList<>();
- ArrayList<DataIdentifier> tmp2 = new ArrayList<>();
- if( fpb.getInputParams()!= null )
- tmp1.addAll(fpb.getInputParams());
- if( fpb.getOutputParams()!= null )
- tmp2.addAll(fpb.getOutputParams());
-
- if( fpb instanceof ExternalFunctionProgramBlockCP )
- {
- ExternalFunctionProgramBlockCP efpb = (ExternalFunctionProgramBlockCP) fpb;
- HashMap<String,String> tmp3 = efpb.getOtherParams();
- if( IDPrefix!=-1 )
- copy = new ExternalFunctionProgramBlockCP(prog,tmp1,tmp2,tmp3,saveReplaceFilenameThreadID(efpb.getBaseDir(),CP_CHILD_THREAD+IDPrefix,CP_CHILD_THREAD+pid));
- else
- copy = new ExternalFunctionProgramBlockCP(prog,tmp1,tmp2,tmp3,saveReplaceFilenameThreadID(efpb.getBaseDir(),CP_ROOT_THREAD_ID,CP_CHILD_THREAD+pid));
- }
- else if( fpb instanceof ExternalFunctionProgramBlock )
- {
- ExternalFunctionProgramBlock efpb = (ExternalFunctionProgramBlock) fpb;
- HashMap<String,String> tmp3 = efpb.getOtherParams();
- if( IDPrefix!=-1 )
- copy = new ExternalFunctionProgramBlock(prog,tmp1,tmp2,tmp3,saveReplaceFilenameThreadID(efpb.getBaseDir(),CP_CHILD_THREAD+IDPrefix, CP_CHILD_THREAD+pid));
- else
- copy = new ExternalFunctionProgramBlock(prog,tmp1,tmp2,tmp3,saveReplaceFilenameThreadID(efpb.getBaseDir(),CP_ROOT_THREAD_ID, CP_CHILD_THREAD+pid));
- }
- else
- {
- if( !fnStack.contains(fnameNewKey) ) {
- fnStack.add(fnameNewKey);
- copy = new FunctionProgramBlock(prog, tmp1, tmp2);
- copy.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, fpb.isRecompileOnce()) );
- copy.setRecompileOnce( fpb.isRecompileOnce() );
- copy.setThreadID(pid);
- fnStack.remove(fnameNewKey);
- }
- else //stop deep copy for recursive function calls
- copy = fpb;
- }
-
- //copy.setVariables( (LocalVariableMap) fpb.getVariables() ); //implicit cloning
- //note: instructions not used by function program block
-
- //put
- prog.addFunctionProgramBlock(namespace, fnameNew, copy);
- fnCreated.add(DMLProgram.constructFunctionKey(namespace, fnameNew));
- }
-
- public static FunctionProgramBlock createDeepCopyFunctionProgramBlock(FunctionProgramBlock fpb, HashSet<String> fnStack, HashSet<String> fnCreated)
- {
- if( fpb == null )
- throw new DMLRuntimeException("Unable to create a deep copy of a non-existing FunctionProgramBlock.");
-
- //create deep copy
- FunctionProgramBlock copy = null;
- ArrayList<DataIdentifier> tmp1 = new ArrayList<>();
- ArrayList<DataIdentifier> tmp2 = new ArrayList<>();
- if( fpb.getInputParams()!= null )
- tmp1.addAll(fpb.getInputParams());
- if( fpb.getOutputParams()!= null )
- tmp2.addAll(fpb.getOutputParams());
-
- copy = new FunctionProgramBlock(fpb.getProgram(), tmp1, tmp2);
- copy.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), 0, -1, fnStack, fnCreated, true, fpb.isRecompileOnce()) );
- copy.setStatementBlock( fpb.getStatementBlock() );
- copy.setRecompileOnce(fpb.isRecompileOnce());
- //copy.setVariables( (LocalVariableMap) fpb.getVariables() ); //implicit cloning
- //note: instructions not used by function program block
-
- return copy;
- }
-
-
- /**
- * Creates a deep copy of an array of instructions and replaces the placeholders of parworker
- * IDs with the concrete IDs of this parfor instance. This is a helper method uses for generating
- * deep copies of program blocks.
- *
- * @param instSet list of instructions
- * @param pid ?
- * @param IDPrefix ?
- * @param prog runtime program
- * @param fnStack ?
- * @param fnCreated ?
- * @param plain ?
- * @param cpFunctions ?
- * @return list of instructions
- */
- public static ArrayList<Instruction> createDeepCopyInstructionSet(ArrayList<Instruction> instSet, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean cpFunctions)
- {
- ArrayList<Instruction> tmp = new ArrayList<>();
- for( Instruction inst : instSet )
- {
- if( inst instanceof FunctionCallCPInstruction && cpFunctions )
- {
- FunctionCallCPInstruction finst = (FunctionCallCPInstruction) inst;
- createDeepCopyFunctionProgramBlock( finst.getNamespace(),
- finst.getFunctionName(),
- pid, IDPrefix, prog, fnStack, fnCreated, plain );
- }
-
- tmp.add( cloneInstruction( inst, pid, plain, cpFunctions ) );
- }
-
- return tmp;
- }
-
- public static Instruction cloneInstruction( Instruction oInst, long pid, boolean plain, boolean cpFunctions )
- {
- Instruction inst = null;
- String tmpString = oInst.toString();
-
- try
- {
- if( oInst instanceof CPInstruction || oInst instanceof SPInstruction || oInst instanceof MRInstruction
- || oInst instanceof GPUInstruction )
- {
- if( oInst instanceof FunctionCallCPInstruction && cpFunctions )
- {
- FunctionCallCPInstruction tmp = (FunctionCallCPInstruction) oInst;
- if( !plain )
- {
- //safe replacement because target variables might include the function name
- //note: this is no update-in-place in order to keep the original function name as basis
- tmpString = tmp.updateInstStringFunctionName(tmp.getFunctionName(), tmp.getFunctionName() + CP_CHILD_THREAD+pid);
- }
- //otherwise: preserve function name
- }
-
- inst = InstructionParser.parseSingleInstruction(tmpString);
- }
- else if( oInst instanceof MRJobInstruction )
- {
- //clone via copy constructor
- inst = new MRJobInstruction( (MRJobInstruction)oInst );
- }
- else
- throw new DMLRuntimeException("Failed to clone instruction: "+oInst);
- }
- catch(Exception ex)
- {
- throw new DMLRuntimeException(ex);
- }
-
- //save replacement of thread id references in instructions
- inst = saveReplaceThreadID( inst, ProgramConverter.CP_ROOT_THREAD_ID,
- ProgramConverter.CP_CHILD_THREAD+pid);
-
- return inst;
- }
-
- public static StatementBlock createStatementBlockCopy( StatementBlock sb, long pid, boolean plain, boolean forceDeepCopy )
- {
- StatementBlock ret = null;
-
- try
- {
- if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION)
- && sb != null //forced deep copy for function recompilation
- && (Recompiler.requiresRecompilation( sb.getHops() ) || forceDeepCopy) )
- {
- //create new statement (shallow copy livein/liveout for recompile, line numbers for explain)
- ret = new StatementBlock();
- ret.setDMLProg(sb.getDMLProg());
- ret.setParseInfo(sb);
- ret.setLiveIn( sb.liveIn() );
- ret.setLiveOut( sb.liveOut() );
- ret.setUpdatedVariables( sb.variablesUpdated() );
- ret.setReadVariables( sb.variablesRead() );
-
- //deep copy hops dag for concurrent recompile
- ArrayList<Hop> hops = Recompiler.deepCopyHopsDag( sb.getHops() );
- if( !plain )
- Recompiler.updateFunctionNames( hops, pid );
- ret.setHops( hops );
- ret.updateRecompilationFlag();
- }
- else
- {
- ret = sb;
- }
- }
- catch( Exception ex )
- {
- throw new DMLRuntimeException( ex );
- }
-
- return ret;
- }
-
- public static IfStatementBlock createIfStatementBlockCopy( IfStatementBlock sb, long pid, boolean plain, boolean forceDeepCopy )
- {
- IfStatementBlock ret = null;
-
- try
- {
- if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION)
- && sb != null //forced deep copy for function recompile
- && (Recompiler.requiresRecompilation( sb.getPredicateHops() ) || forceDeepCopy) )
- {
- //create new statement (shallow copy livein/liveout for recompile, line numbers for explain)
- ret = new IfStatementBlock();
- ret.setDMLProg(sb.getDMLProg());
- ret.setParseInfo(sb);
- ret.setLiveIn( sb.liveIn() );
- ret.setLiveOut( sb.liveOut() );
- ret.setUpdatedVariables( sb.variablesUpdated() );
- ret.setReadVariables( sb.variablesRead() );
-
- //shallow copy child statements
- ret.setStatements( sb.getStatements() );
-
- //deep copy predicate hops dag for concurrent recompile
- Hop hops = Recompiler.deepCopyHopsDag( sb.getPredicateHops() );
- ret.setPredicateHops( hops );
- ret.updatePredicateRecompilationFlag();
- }
- else
- {
- ret = sb;
- }
- }
- catch( Exception ex )
- {
- throw new DMLRuntimeException( ex );
- }
-
- return ret;
- }
-
- public static WhileStatementBlock createWhileStatementBlockCopy( WhileStatementBlock sb, long pid, boolean plain, boolean forceDeepCopy )
- {
- WhileStatementBlock ret = null;
-
- try
- {
- if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION)
- && sb != null //forced deep copy for function recompile
- && (Recompiler.requiresRecompilation( sb.getPredicateHops() ) || forceDeepCopy) )
- {
- //create new statement (shallow copy livein/liveout for recompile, line numbers for explain)
- ret = new WhileStatementBlock();
- ret.setDMLProg(sb.getDMLProg());
- ret.setParseInfo(sb);
- ret.setLiveIn( sb.liveIn() );
- ret.setLiveOut( sb.liveOut() );
- ret.setUpdatedVariables( sb.variablesUpdated() );
- ret.setReadVariables( sb.variablesRead() );
- ret.setUpdateInPlaceVars( sb.getUpdateInPlaceVars() );
-
- //shallow copy child statements
- ret.setStatements( sb.getStatements() );
-
- //deep copy predicate hops dag for concurrent recompile
- Hop hops = Recompiler.deepCopyHopsDag( sb.getPredicateHops() );
- ret.setPredicateHops( hops );
- ret.updatePredicateRecompilationFlag();
- }
- else
- {
- ret = sb;
- }
- }
- catch( Exception ex )
- {
- throw new DMLRuntimeException( ex );
- }
-
- return ret;
- }
-
- public static ForStatementBlock createForStatementBlockCopy( ForStatementBlock sb, long pid, boolean plain, boolean forceDeepCopy )
- {
- ForStatementBlock ret = null;
-
- try
- {
- if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION)
- && sb != null
- && ( Recompiler.requiresRecompilation(sb.getFromHops()) ||
- Recompiler.requiresRecompilation(sb.getToHops()) ||
- Recompiler.requiresRecompilation(sb.getIncrementHops()) ||
- forceDeepCopy ) )
- {
- ret = (sb instanceof ParForStatementBlock) ? new ParForStatementBlock() : new ForStatementBlock();
-
- //create new statement (shallow copy livein/liveout for recompile, line numbers for explain)
- ret.setDMLProg(sb.getDMLProg());
- ret.setParseInfo(sb);
- ret.setLiveIn( sb.liveIn() );
- ret.setLiveOut( sb.liveOut() );
- ret.setUpdatedVariables( sb.variablesUpdated() );
- ret.setReadVariables( sb.variablesRead() );
- ret.setUpdateInPlaceVars( sb.getUpdateInPlaceVars() );
-
- //shallow copy child statements
- ret.setStatements( sb.getStatements() );
-
- //deep copy predicate hops dag for concurrent recompile
- if( sb.requiresFromRecompilation() ){
- Hop hops = Recompiler.deepCopyHopsDag( sb.getFromHops() );
- ret.setFromHops( hops );
- }
- if( sb.requiresToRecompilation() ){
- Hop hops = Recompiler.deepCopyHopsDag( sb.getToHops() );
- ret.setToHops( hops );
- }
- if( sb.requiresIncrementRecompilation() ){
- Hop hops = Recompiler.deepCopyHopsDag( sb.getIncrementHops() );
- ret.setIncrementHops( hops );
- }
- ret.updatePredicateRecompilationFlags();
- }
- else
- {
- ret = sb;
- }
- }
- catch( Exception ex )
- {
- throw new DMLRuntimeException( ex );
- }
-
- return ret;
- }
-
-
- ////////////////////////////////
- // SERIALIZATION
- ////////////////////////////////
-
- public static String serializeParForBody( ParForBody body ) {
- return serializeParForBody(body, new HashMap<String, byte[]>());
- }
-
- public static String serializeParForBody( ParForBody body, HashMap<String,byte[]> clsMap )
- {
- ArrayList<ProgramBlock> pbs = body.getChildBlocks();
- ArrayList<ResultVar> rVnames = body.getResultVariables();
- ExecutionContext ec = body.getEc();
-
- if( pbs.isEmpty() )
- return PARFORBODY_BEGIN + PARFORBODY_END;
-
- Program prog = pbs.get( 0 ).getProgram();
-
- StringBuilder sb = new StringBuilder();
- sb.append( PARFORBODY_BEGIN );
- sb.append( NEWLINE );
-
- //handle DMLScript UUID (propagate original uuid for writing to scratch space)
- sb.append( DMLScript.getUUID() );
- sb.append( COMPONENTS_DELIM );
- sb.append( NEWLINE );
-
- //handle DML config
- sb.append( ConfigurationManager.getDMLConfig().serializeDMLConfig() );
- sb.append( COMPONENTS_DELIM );
- sb.append( NEWLINE );
-
- //handle additional configurations
- sb.append( PARFOR_CONF_STATS + "=" + DMLScript.STATISTICS );
- sb.append( COMPONENTS_DELIM );
- sb.append( NEWLINE );
-
- //handle program
- sb.append( PARFOR_PROG_BEGIN );
- sb.append( NEWLINE );
- sb.append( serializeProgram(prog, pbs, clsMap) );
- sb.append( PARFOR_PROG_END );
- sb.append( NEWLINE );
- sb.append( COMPONENTS_DELIM );
- sb.append( NEWLINE );
-
- //handle result variable names
- sb.append( serializeResultVariables(rVnames) );
- sb.append( COMPONENTS_DELIM );
-
- //handle execution context
- //note: this includes also the symbol table (serialize only the top-level variable map,
- // (symbol tables for nested/child blocks are created at parse time, on the remote side)
- sb.append( PARFOR_EC_BEGIN );
- sb.append( serializeExecutionContext(ec) );
- sb.append( PARFOR_EC_END );
- sb.append( NEWLINE );
- sb.append( COMPONENTS_DELIM );
- sb.append( NEWLINE );
-
- //handle program blocks -- ONLY instructions, not variables.
- sb.append( PARFOR_PBS_BEGIN );
- sb.append( NEWLINE );
- sb.append( rSerializeProgramBlocks(pbs, clsMap) );
- sb.append( PARFOR_PBS_END );
- sb.append( NEWLINE );
-
- sb.append( PARFORBODY_END );
-
- return sb.toString();
- }
-
- private static String serializeProgram( Program prog, ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap ) {
- //note program contains variables, programblocks and function program blocks
- //but in order to avoid redundancy, we only serialize function program blocks
- HashMap<String, FunctionProgramBlock> fpb = prog.getFunctionProgramBlocks();
- HashSet<String> cand = new HashSet<>();
- rFindSerializationCandidates(pbs, cand);
- return rSerializeFunctionProgramBlocks( fpb, cand, clsMap );
- }
-
- private static void rFindSerializationCandidates( ArrayList<ProgramBlock> pbs, HashSet<String> cand )
- {
- for( ProgramBlock pb : pbs )
- {
- if( pb instanceof WhileProgramBlock )
- {
- WhileProgramBlock wpb = (WhileProgramBlock) pb;
- rFindSerializationCandidates(wpb.getChildBlocks(), cand );
- }
- else if ( pb instanceof ForProgramBlock || pb instanceof ParForProgramBlock )
- {
- ForProgramBlock fpb = (ForProgramBlock) pb;
- rFindSerializationCandidates(fpb.getChildBlocks(), cand);
- }
- else if ( pb instanceof IfProgramBlock )
- {
- IfProgramBlock ipb = (IfProgramBlock) pb;
- rFindSerializationCandidates(ipb.getChildBlocksIfBody(), cand);
- if( ipb.getChildBlocksElseBody() != null )
- rFindSerializationCandidates(ipb.getChildBlocksElseBody(), cand);
- }
- else //all generic program blocks
- {
- for( Instruction inst : pb.getInstructions() )
- if( inst instanceof FunctionCallCPInstruction )
- {
- FunctionCallCPInstruction fci = (FunctionCallCPInstruction) inst;
- String fkey = DMLProgram.constructFunctionKey(fci.getNamespace(), fci.getFunctionName());
- if( !cand.contains(fkey) ) //memoization for multiple calls, recursion
- {
- cand.add( fkey ); //add to candidates
-
- //investigate chains of function calls
- FunctionProgramBlock fpb = pb.getProgram().getFunctionProgramBlock(fci.getNamespace(), fci.getFunctionName());
- rFindSerializationCandidates(fpb.getChildBlocks(), cand);
- }
- }
- }
- }
- }
-
- private static String serializeVariables (LocalVariableMap vars) {
- StringBuilder sb = new StringBuilder();
- sb.append( PARFOR_VARS_BEGIN );
- sb.append( vars.serialize() );
- sb.append( PARFOR_VARS_END );
- return sb.toString();
- }
-
- public static String serializeDataObject(String key, Data dat)
- {
- // SCHEMA: <name>|<datatype>|<valuetype>|value
- // (scalars are serialize by value, matrices by filename)
- StringBuilder sb = new StringBuilder();
- //prepare data for serialization
- String name = key;
- DataType datatype = dat.getDataType();
- ValueType valuetype = dat.getValueType();
- String value = null;
- String[] matrixMetaData = null;
- switch( datatype )
- {
- case SCALAR:
- ScalarObject so = (ScalarObject) dat;
- //name = so.getName();
- value = so.getStringValue();
- break;
- case MATRIX:
- MatrixObject mo = (MatrixObject) dat;
- MetaDataFormat md = (MetaDataFormat) dat.getMetaData();
- MatrixCharacteristics mc = md.getMatrixCharacteristics();
- value = mo.getFileName();
- PartitionFormat partFormat = (mo.getPartitionFormat()!=null) ? new PartitionFormat(
- mo.getPartitionFormat(),mo.getPartitionSize()) : PartitionFormat.NONE;
- matrixMetaData = new String[9];
- matrixMetaData[0] = String.valueOf( mc.getRows() );
- matrixMetaData[1] = String.valueOf( mc.getCols() );
- matrixMetaData[2] = String.valueOf( mc.getRowsPerBlock() );
- matrixMetaData[3] = String.valueOf( mc.getColsPerBlock() );
- matrixMetaData[4] = String.valueOf( mc.getNonZeros() );
- matrixMetaData[5] = InputInfo.inputInfoToString( md.getInputInfo() );
- matrixMetaData[6] = OutputInfo.outputInfoToString( md.getOutputInfo() );
- matrixMetaData[7] = String.valueOf( partFormat );
- matrixMetaData[8] = String.valueOf( mo.getUpdateType() );
- break;
- default:
- throw new DMLRuntimeException("Unable to serialize datatype "+datatype);
- }
-
- //serialize data
- sb.append(name);
- sb.append(DATA_FIELD_DELIM);
- sb.append(datatype);
- sb.append(DATA_FIELD_DELIM);
- sb.append(valuetype);
- sb.append(DATA_FIELD_DELIM);
- sb.append(value);
- if( matrixMetaData != null )
- for( int i=0; i<matrixMetaData.length; i++ )
- {
- sb.append(DATA_FIELD_DELIM);
- sb.append(matrixMetaData[i]);
- }
-
- return sb.toString();
- }
-
- private static String serializeExecutionContext( ExecutionContext ec ) {
- return (ec != null) ? serializeVariables( ec.getVariables() ) : EMPTY;
- }
-
- @SuppressWarnings("all")
- private static String serializeInstructions( ArrayList<Instruction> inst, HashMap<String, byte[]> clsMap )
- {
- StringBuilder sb = new StringBuilder();
- int count = 0;
- for( Instruction linst : inst )
- {
- //check that only cp instruction are transmitted
- if( !( linst instanceof CPInstruction || linst instanceof ExternalFunctionInvocationInstruction ) )
- throw new DMLRuntimeException( NOT_SUPPORTED_MR_INSTRUCTION + " " +linst.getClass().getName()+"\n"+linst );
-
- //obtain serialized version of generated classes
- if( linst instanceof SpoofCPInstruction ) {
- Class<?> cla = ((SpoofCPInstruction) linst).getOperatorClass();
- clsMap.put(cla.getName(), CodegenUtils.getClassData(cla.getName()));
- }
-
- if( count > 0 )
- sb.append( ELEMENT_DELIM );
-
- sb.append( checkAndReplaceLiterals( linst.toString() ) );
- count++;
- }
-
- return sb.toString();
- }
-
- /**
- * Replacement of internal delimiters occurring in literals of instructions
- * in order to ensure robustness of serialization and parsing.
- * (e.g. print( "a,b" ) would break the parsing of instruction that internally
- * are separated with a "," )
- *
- * @param instStr instruction string
- * @return instruction string with replacements
- */
- private static String checkAndReplaceLiterals( String instStr )
- {
- String tmp = instStr;
-
- //1) check own delimiters (very unlikely due to special characters)
- if( tmp.contains(COMPONENTS_DELIM) ) {
- tmp = tmp.replaceAll(COMPONENTS_DELIM, ".");
- LOG.warn("Replaced special literal character sequence "+COMPONENTS_DELIM+" with '.'");
- }
-
- if( tmp.contains(ELEMENT_DELIM) ) {
- tmp = tmp.replaceAll(ELEMENT_DELIM, ".");
- LOG.warn("Replaced special literal character sequence "+ELEMENT_DELIM+" with '.'");
- }
-
- if( tmp.contains( LEVELIN ) ){
- tmp = tmp.replaceAll(LEVELIN, "("); // '\\' required if LEVELIN='{' because regex
- LOG.warn("Replaced special literal character sequence "+LEVELIN+" with '('");
- }
-
- if( tmp.contains(LEVELOUT) ){
- tmp = tmp.replaceAll(LEVELOUT, ")");
- LOG.warn("Replaced special literal character sequence "+LEVELOUT+" with ')'");
- }
-
- //NOTE: DATA_FIELD_DELIM and KEY_VALUE_DELIM not required
- //because those literals cannot occur in critical places.
-
- //2) check end tag of CDATA
- if( tmp.contains(PARFOR_CDATA_END) ){
- tmp = tmp.replaceAll(PARFOR_CDATA_END, "."); //prevent XML parsing issues in job.xml
- LOG.warn("Replaced special literal character sequence "+PARFOR_CDATA_END+" with '.'");
- }
-
- return tmp;
- }
-
- private static String serializeStringHashMap( HashMap<String,String> vars)
- {
- StringBuilder sb = new StringBuilder();
- int count=0;
- for( Entry<String,String> e : vars.entrySet() )
- {
- if(count>0)
- sb.append( ELEMENT_DELIM );
- sb.append( e.getKey() );
- sb.append( KEY_VALUE_DELIM );
- sb.append( e.getValue() );
- count++;
- }
- return sb.toString();
- }
-
- public static String serializeStringCollection( Collection<String> set)
- {
- StringBuilder sb = new StringBuilder();
- int count=0;
- for( String s : set )
- {
- if(count>0)
- sb.append( ", " );
- sb.append( s );
- count++;
- }
- return sb.toString();
- }
-
- public static String serializeResultVariables( ArrayList<ResultVar> vars) {
- StringBuilder sb = new StringBuilder();
- int count=0;
- for( ResultVar var : vars ) {
- if(count>0)
- sb.append( ELEMENT_DELIM );
- sb.append( var._isAccum ? var._name+"+" : var._name );
- count++;
- }
- return sb.toString();
- }
-
- public static String serializeStringArrayList( ArrayList<String> vars)
- {
- StringBuilder sb = new StringBuilder();
- int count=0;
- for( String s : vars )
- {
- if(count>0)
- sb.append( ELEMENT_DELIM );
- sb.append( s );
- count++;
- }
- return sb.toString();
- }
-
- private static String serializeDataIdentifiers( ArrayList<DataIdentifier> var)
- {
- StringBuilder sb = new StringBuilder();
- int count=0;
- for( DataIdentifier dat : var )
- {
- if(count>0)
- sb.append( ELEMENT_DELIM );
- sb.append( serializeDataIdentifier(dat) );
- count++;
- }
- return sb.toString();
- }
-
- private static String serializeDataIdentifier( DataIdentifier dat ) {
- // SCHEMA: <name>|<datatype>|<valuetype>
- StringBuilder sb = new StringBuilder();
- sb.append(dat.getName());
- sb.append(DATA_FIELD_DELIM);
- sb.append(dat.getDataType());
- sb.append(DATA_FIELD_DELIM);
- sb.append(dat.getValueType());
-
- return sb.toString();
- }
-
- private static String rSerializeFunctionProgramBlocks(HashMap<String,FunctionProgramBlock> pbs, HashSet<String> cand, HashMap<String, byte[]> clsMap)
- {
- StringBuilder sb = new StringBuilder();
-
- int count = 0;
- for( Entry<String,FunctionProgramBlock> pb : pbs.entrySet() )
- {
- if( !cand.contains(pb.getKey()) ) //skip function not included in the parfor body
- continue;
-
- if( count>0 ) {
- sb.append( ELEMENT_DELIM );
- sb.append( NEWLINE );
- }
- sb.append( pb.getKey() );
- sb.append( KEY_VALUE_DELIM );
- sb.append( rSerializeProgramBlock(pb.getValue(), clsMap) );
- count++;
- }
- sb.append(NEWLINE);
- return sb.toString();
- }
-
- private static String rSerializeProgramBlocks(ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap) {
- StringBuilder sb = new StringBuilder();
- int count = 0;
- for( ProgramBlock pb : pbs )
- {
- if( count>0 )
- {
- sb.append( ELEMENT_DELIM );
- sb.append(NEWLINE);
- }
- sb.append( rSerializeProgramBlock(pb, clsMap) );
- count++;
- }
-
- return sb.toString();
- }
-
- private static String rSerializeProgramBlock( ProgramBlock pb, HashMap<String, byte[]> clsMap ) {
- StringBuilder sb = new StringBuilder();
-
- //handle header
- if( pb instanceof WhileProgramBlock )
- sb.append( PARFOR_PB_WHILE );
- else if ( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock) )
- sb.append( PARFOR_PB_FOR );
- else if ( pb instanceof ParForProgramBlock )
- sb.append( PARFOR_PB_PARFOR );
- else if ( pb instanceof IfProgramBlock )
- sb.append( PARFOR_PB_IF );
- else if ( pb instanceof FunctionProgramBlock && !(pb instanceof ExternalFunctionProgramBlock) )
- sb.append( PARFOR_PB_FC );
- else if ( pb instanceof ExternalFunctionProgramBlock )
- sb.append( PARFOR_PB_EFC );
- else //all generic program blocks
- sb.append( PARFOR_PB_BEGIN );
-
- //handle body
- if( pb instanceof WhileProgramBlock )
- {
- WhileProgramBlock wpb = (WhileProgramBlock) pb;
- sb.append( PARFOR_INST_BEGIN );
- sb.append( serializeInstructions( wpb.getPredicate(), clsMap ) );
- sb.append( PARFOR_INST_END );
- sb.append( COMPONENTS_DELIM );
- sb.append( PARFOR_INST_BEGIN );
- sb.append( serializeInstructions( wpb.getExitInstructions(), clsMap ) );
- sb.append( PARFOR_INST_END );
- sb.append( COMPONENTS_DELIM );
- sb.append( PARFOR_PBS_BEGIN );
- sb.append( rSerializeProgramBlocks( wpb.getChildBlocks(), clsMap) );
- sb.append( PARFOR_PBS_END );
- }
- else if ( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock ) )
- {
- ForProgramBlock fpb = (ForProgramBlock) pb;
- sb.append( fpb.getIterVar() );
- sb.append( COMPONENTS_DELIM );
- sb.append( PARFOR_INST_BEGIN );
- sb.append( serializeInstructions( fpb.getFromInstructions(), clsMap ) );
- sb.append( PARFOR_INST_END );
- sb.append( COMPONENTS_DELIM );
- sb.append( PARFOR_INST_BEGIN );
- sb.append( serializeInstructions(fpb.getToInstructions(), clsMap) );
- sb.append( PARFOR_INST_END );
- sb.append( COMPONENTS_DELIM );
- sb.append( PARFOR_INST_BEGIN );
- sb.append( serializeInstructions(fpb.getIncrementInstructions(), clsMap) );
- sb.append( PARFOR_INST_END );
- sb.append( COMPONENTS_DELIM );
- sb.append( PARFOR_INST_BEGIN );
- sb.append( serializeInstructions(fpb.getExitInstructions(), clsMap) );
- sb.append( PARFOR_INST_END );
- sb.append( COMPONENTS_DELIM );
- sb.append( PARFOR_PBS_BEGIN );
- sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) );
- sb.append( PARFOR_PBS_END );
- }
- else if ( pb instanceof ParForProgramBlock )
- {
- ParForProgramBlock pfpb = (ParForProgramBlock) pb;
-
- //check for nested remote ParFOR
- if( PExecMode.valueOf( pfpb.getParForParams().get( ParForStatementBlock.EXEC_MODE )) == PExecMode.REMOTE_MR )
- throw new DMLRuntimeException( NOT_SUPPORTED_MR_PARFOR );
-
- sb.append( pfpb.getIterVar() );
- sb.append( COMPONENTS_DELIM );
- sb.append( serializeResultVariables( pfpb.getResultVariables()) );
- sb.append( COMPONENTS_DELIM );
- sb.append( serializeStringHashMap( pfpb.getParForParams()) ); //parameters of nested parfor
- sb.append( COMPONENTS_DELIM );
- sb.append( PARFOR_INST_BEGIN );
- sb.append( serializeInstructions(pfpb.getFromInstructions(), clsMap) );
- sb.append( PARFOR_INST_END );
- sb.append( COMPONENTS_DELIM );
- sb.append( PARFOR_INST_BEGIN );
- sb.append( serializeInstructions(pfpb.getToInstructions(), clsMap) );
- sb.append( PARFOR_INST_END );
- sb.append( COMPONENTS_DELIM );
- sb.append( PARFOR_INST_BEGIN );
- sb.append( serializeInstructions(pfpb.getIncrementInstructions(), clsMap) );
- sb.append( PARFOR_INST_END );
- sb.append( COMPONENTS_DELIM );
- sb.append( PARFOR_INST_BEGIN );
- sb.append( serializeInstructions(pfpb.getExitInstructions(), clsMap) );
- sb.append( PARFOR_INST_END );
- sb.append( COMPONENTS_DELIM );
- sb.append( PARFOR_PBS_BEGIN );
- sb.append( rSerializeProgramBlocks( pfpb.getChildBlocks(), clsMap ) );
- sb.append( PARFOR_PBS_END );
- }
- else if ( pb instanceof IfProgramBlock )
- {
- IfProgramBlock ipb = (IfProgramBlock) pb;
- sb.append( PARFOR_INST_BEGIN );
- sb.append( serializeInstructions(ipb.getPredicate(), clsMap) );
- sb.append( PARFOR_INST_END );
- sb.append( COMPONENTS_DELIM );
- sb.append( PARFOR_INST_BEGIN );
- sb.append( serializeInstructions(ipb.getExitInstructions(), clsMap) );
- sb.append( PARFOR_INST_END );
- sb.append( COMPONENTS_DELIM );
- sb.append( PARFOR_PBS_BEGIN );
- sb.append( rSerializeProgramBlocks(ipb.getChildBlocksIfBody(), clsMap) );
- sb.append( PARFOR_PBS_END );
- sb.append( COMPONENTS_DELIM );
- sb.append( PARFOR_PBS_BEGIN );
- sb.append( rSerializeProgramBlocks(ipb.getChildBlocksElseBody(), clsMap) );
- sb.append( PARFOR_PBS_END );
- }
- else if( pb instanceof FunctionProgramBlock && !(pb instanceof ExternalFunctionProgramBlock) )
- {
- FunctionProgramBlock fpb = (FunctionProgramBlock) pb;
-
- sb.append( serializeDataIdentifiers( fpb.getInputParams() ) );
- sb.append( COMPONENTS_DELIM );
- sb.append( serializeDataIdentifiers( fpb.getOutputParams() ) );
- sb.append( COMPONENTS_DELIM );
- sb.append( PARFOR_INST_BEGIN );
- sb.append( serializeInstructions(fpb.getInstructions(), clsMap) );
- sb.append( PARFOR_INST_END );
- sb.append( COMPONENTS_DELIM );
- sb.append( PARFOR_PBS_BEGIN );
- sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) );
- sb.append( PARFOR_PBS_END );
- sb.append( COMPONENTS_DELIM );
- }
- else if( pb instanceof ExternalFunctionProgramBlock )
- {
- if( !(pb instanceof ExternalFunctionProgramBlockCP) )
- {
- throw new DMLRuntimeException( NOT_SUPPORTED_EXTERNALFUNCTION_PB );
- }
-
- ExternalFunctionProgramBlockCP fpb = (ExternalFunctionProgramBlockCP) pb;
-
- sb.append( serializeDataIdentifiers( fpb.getInputParams() ) );
- sb.append( COMPONENTS_DELIM );
- sb.append( serializeDataIdentifiers( fpb.getOutputParams() ) );
- sb.append( COMPONENTS_DELIM );
- sb.append( serializeStringHashMap( fpb.getOtherParams() ) );
- sb.append( COMPONENTS_DELIM );
- sb.append( fpb.getBaseDir() );
- sb.append( COMPONENTS_DELIM );
-
- sb.append( PARFOR_INST_BEGIN );
- //create on construction anyway
- sb.append( PARFOR_INST_END );
- sb.append( COMPONENTS_DELIM );
- sb.append( PARFOR_PBS_BEGIN );
- sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) );
- sb.append( PARFOR_PBS_END );
- }
- else //all generic program blocks
- {
- sb.append( PARFOR_INST_BEGIN );
- sb.append( serializeInstructions(pb.getInstructions(), clsMap) );
- sb.append( PARFOR_INST_END );
- }
-
-
- //handle end
- sb.append( PARFOR_PB_END );
-
- return sb.toString();
- }
-
-
- ////////////////////////////////
- // PARSING
- ////////////////////////////////
- public static ParForBody parseParForBody( String in, int id ) {
- return parseParForBody(in, id, false);
- }
-
- public static ParForBody parseParForBody( String in, int id, boolean inSpark ) {
- ParForBody body = new ParForBody();
-
- //header elimination
- String tmpin = in.replaceAll(NEWLINE, ""); //normalization
- tmpin = tmpin.substring(PARFORBODY_BEGIN.length(),tmpin.length()-PARFORBODY_END.length()); //remove start/end
- HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(tmpin, COMPONENTS_DELIM);
-
- //handle DMLScript UUID (NOTE: set directly in DMLScript)
- //(master UUID is used for all nodes (in order to simply cleanup))
- DMLScript.setUUID( st.nextToken() );
-
- //handle DML config (NOTE: set directly in ConfigurationManager)
- String confStr = st.nextToken();
- JobConf job = ConfigurationManager.getCachedJobConf();
- if( !InfrastructureAnalyzer.isLocalMode(job) ) {
- if( confStr != null && !confStr.trim().isEmpty() ) {
- DMLConfig dmlconf = DMLConfig.parseDMLConfig(confStr);
- CompilerConfig cconf = OptimizerUtils.constructCompilerConfig(dmlconf);
- ConfigurationManager.setLocalConfig(dmlconf);
- ConfigurationManager.setLocalConfig(cconf);
- }
- //init internal configuration w/ parsed or default config
- ParForProgramBlock.initInternalConfigurations(
- ConfigurationManager.getDMLConfig());
- }
-
- //handle additional configs
- String aconfs = st.nextToken();
- if( !inSpark )
- parseAndSetAdditionalConfigurations( aconfs );
-
- //handle program
- String progStr = st.nextToken();
- Program prog = parseProgram( progStr, id );
-
- //handle result variable names
- String rvarStr = st.nextToken();
- ArrayList<ResultVar> rvars = parseResultVariables(rvarStr);
- body.setResultVariables(rvars);
-
- //handle execution context
- String ecStr = st.nextToken();
- ExecutionContext ec = parseExecutionContext( ecStr, prog );
-
- //handle program blocks
- String spbs = st.nextToken();
- ArrayList<ProgramBlock> pbs = rParseProgramBlocks(spbs, prog, id);
-
- body.setChildBlocks( pbs );
- body.setEc( ec );
-
- return body;
- }
-
- public static Program parseProgram( String in, int id ) {
- String lin = in.substring( PARFOR_PROG_BEGIN.length(),in.length()-PARFOR_PROG_END.length()).trim();
-
- Program prog = new Program();
- HashMap<String,FunctionProgramBlock> fc = parseFunctionProgramBlocks(lin, prog, id);
-
- for( Entry<String,FunctionProgramBlock> e : fc.entrySet() )
- {
- String[] keypart = e.getKey().split( Program.KEY_DELIM );
- String namespace = keypart[0];
- String name = keypart[1];
-
- prog.addFunctionProgramBlock(namespace, name, e.getValue());
- }
-
- return prog;
- }
-
- private static LocalVariableMap parseVariables(String in) {
- LocalVariableMap ret = null;
-
- if( in.length()> PARFOR_VARS_BEGIN.length() + PARFOR_VARS_END.length())
- {
- String varStr = in.substring( PARFOR_VARS_BEGIN.length(),in.length()-PARFOR_VARS_END.length()).trim();
- ret = LocalVariableMap.deserialize(varStr);
- }
- else //empty input symbol table
- {
- ret = new LocalVariableMap();
- }
-
- return ret;
- }
-
- private static HashMap<String,FunctionProgramBlock> parseFunctionProgramBlocks( String in, Program prog, int id ) {
- HashMap<String,FunctionProgramBlock> ret = new HashMap<>();
- HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer( in, ELEMENT_DELIM );
-
- while( st.hasMoreTokens() )
- {
- String lvar = st.nextToken(); //with ID = CP_CHILD_THREAD+id for current use
-
- //put first copy into prog (for direct use)
- int index = lvar.indexOf( KEY_VALUE_DELIM );
- String tmp1 = lvar.substring(0, index); // + CP_CHILD_THREAD+id;
- String tmp2 = lvar.substring(index + 1);
- ret.put(tmp1, (FunctionProgramBlock)rParseProgramBlock(tmp2, prog, id));
- }
-
- return ret;
- }
-
- private static ArrayList<ProgramBlock> rParseProgramBlocks(String in, Program prog, int id) {
- ArrayList<ProgramBlock> pbs = new ArrayList<>();
- String tmpdata = in.substring(PARFOR_PBS_BEGIN.length(),in.length()-PARFOR_PBS_END.length());
- HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(tmpdata, ELEMENT_DELIM);
-
- while( st.hasMoreTokens() )
- {
- String tmp = st.nextToken();
- pbs.add( rParseProgramBlock( tmp, prog, id ) );
- }
-
- return pbs;
- }
-
- private static ProgramBlock rParseProgramBlock( String in, Program prog, int id ) {
- ProgramBlock pb = null;
-
- if( in.startsWith( PARFOR_PB_WHILE ) )
- pb = rParseWhileProgramBlock( in, prog, id );
- else if ( in.startsWith(PARFOR_PB_FOR ) )
- pb = rParseForProgramBlock( in, prog, id );
- else if ( in.startsWith(PARFOR_PB_PARFOR ) )
- pb = rParseParForProgramBlock( in, prog, id );
- else if ( in.startsWith(PARFOR_PB_IF ) )
- pb = rParseIfProgramBlock( in, prog, id );
- else if ( in.startsWith(PARFOR_PB_FC ) )
- pb = rParseFunctionProgramBlock( in, prog, id );
- else if ( in.startsWith(PARFOR_PB_EFC ) )
- pb = rParseExternalFunctionProgramBlock( in, prog, id );
- else if ( in.startsWith(PARFOR_PB_BEGIN ) )
- pb = rParseGenericProgramBlock( in, prog, id );
- else
- throw new DMLRuntimeException( NOT_SUPPORTED_PB+" "+in );
-
- return pb;
- }
-
- private static WhileProgramBlock rParseWhileProgramBlock( String in, Program prog, int id ) {
- String lin = in.substring( PARFOR_PB_WHILE.length(),in.length()-PARFOR_PB_END.length());
- HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
-
- //predicate instructions
- ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id);
-
- //exit instructions
- ArrayList<Instruction> exit = parseInstructions(st.nextToken(),id);
-
- //program blocks
- ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id);
-
- WhileProgramBlock wpb = new WhileProgramBlock(prog,inst);
- wpb.setExitInstructions2(exit);
- wpb.setChildBlocks(pbs);
-
- return wpb;
- }
-
- private static ForProgramBlock rParseForProgramBlock( String in, Program prog, int id ) {
- String lin = in.substring( PARFOR_PB_FOR.length(),in.length()-PARFOR_PB_END.length());
- HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
-
- //inputs
- String iterVar = st.nextToken();
-
- //instructions
- ArrayList<Instruction> from = parseInstructions(st.nextToken(),id);
- ArrayList<Instruction> to = parseInstructions(st.nextToken(),id);
- ArrayList<Instruction> incr = parseInstructions(st.nextToken(),id);
-
- //exit instructions
- ArrayList<Instruction> exit = parseInstructions(st.nextToken(),id);
-
- //program blocks
- ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id);
-
- ForProgramBlock fpb = new ForProgramBlock(prog, iterVar);
- fpb.setFromInstructions(from);
- fpb.setToInstructions(to);
- fpb.setIncrementInstructions(incr);
- fpb.setExitInstructions(exit);
- fpb.setChildBlocks(pbs);
-
- return fpb;
- }
-
- private static ParForProgramBlock rParseParForProgramBlock( String in, Program prog, int id ) {
- String lin = in.substring( PARFOR_PB_PARFOR.length(),in.length()-PARFOR_PB_END.length());
- HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
-
- //inputs
- String iterVar = st.nextToken();
- ArrayList<ResultVar> resultVars = parseResultVariables(st.nextToken());
- HashMap<String,String> params = parseStringHashMap(st.nextToken());
-
- //instructions
- ArrayList<Instruction> from = parseInstructions(st.nextToken(), 0);
- ArrayList<Instruction> to = parseInstructions(st.nextToken(), 0);
- ArrayList<Instruction> incr = parseInstructions(st.nextToken(), 0);
-
- //exit instructions
- ArrayList<Instruction> exit = parseInstructions(st.nextToken(), 0);
-
- //program blocks //reset id to preinit state, replaced during exec
- ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, 0);
-
- ParForProgramBlock pfpb = new ParForProgramBlock(id, prog, iterVar, params, resultVars);
- pfpb.disableOptimization(); //already done in top-level parfor
- pfpb.setFromInstructions(from);
- pfpb.setToInstructions(to);
- pfpb.setIncrementInstructions(incr);
- pfpb.setExitInstructions(exit);
- pfpb.setChildBlocks(pbs);
-
- return pfpb;
- }
-
- private static IfProgramBlock rParseIfProgramBlock( String in, Program prog, int id ) {
- String lin = in.substring( PARFOR_PB_IF.length(),in.length()-PARFOR_PB_END.length());
- HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
-
- //predicate instructions
- ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id);
-
- //exit instructions
- ArrayList<Instruction> exit = parseInstructions(st.nextToken(),id);
-
- //program blocks: if and else
- ArrayList<ProgramBlock> pbs1 = rParseProgramBlocks(st.nextToken(), prog, id);
- ArrayList<ProgramBlock> pbs2 = rParseProgramBlocks(st.nextToken(), prog, id);
-
- IfProgramBlock ipb = new IfProgramBlock(prog,inst);
- ipb.setExitInstructions2(exit);
- ipb.setChildBlocksIfBody(pbs1);
- ipb.setChildBlocksElseBody(pbs2);
-
- return ipb;
- }
-
- private static FunctionProgramBlock rParseFunctionProgramBlock( String in, Program prog, int id ) {
- String lin = in.substring( PARFOR_PB_FC.length(),in.length()-PARFOR_PB_END.length());
- HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
-
- //inputs and outputs
- ArrayList<DataIdentifier> dat1 = parseDataIdentifiers(st.nextToken());
- ArrayList<DataIdentifier> dat2 = parseDataIdentifiers(st.nextToken());
-
- //instructions
- ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id);
-
- //program blocks
- ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id);
-
- ArrayList<DataIdentifier> tmp1 = new ArrayList<>(dat1);
- ArrayList<DataIdentifier> tmp2 = new ArrayList<>(dat2);
- FunctionProgramBlock fpb = new FunctionProgramBlock(prog, tmp1, tmp2);
- fpb.setInstructions(inst);
- fpb.setChildBlocks(pbs);
-
- return fpb;
- }
-
- private static ExternalFunctionProgramBlock rParseExternalFunctionProgramBlock( String in, Program prog, int id ) {
- String lin = in.substring( PARFOR_PB_EFC.length(),in.length()-PARFOR_PB_END.length());
- HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
-
- //LocalVariableMap vars = parseVariables(st.nextToken());
-
- //inputs, outputs and params
- ArrayList<DataIdentifier> dat1 = parseDataIdentifiers(st.nextToken());
- ArrayList<DataIdentifier> dat2 = parseDataIdentifiers(st.nextToken());
- HashMap<String,String> dat3 = parseStringHashMap(st.nextToken());
-
- //basedir
- String basedir = st.nextToken();
-
- //instructions (required for removing INST BEGIN, END)
- parseInstructions(st.nextToken(),id);
-
- //program blocks
- ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id);
-
- ArrayList<DataIdentifier> tmp1 = new ArrayList<>(dat1);
- ArrayList<DataIdentifier> tmp2 = new ArrayList<>(dat2);
-
- //only CP external functions, because no nested MR jobs for reblocks
- ExternalFunctionProgramBlockCP efpb = new ExternalFunctionProgramBlockCP(prog, tmp1, tmp2, dat3, basedir);
- efpb.setChildBlocks(pbs);
-
- return efpb;
- }
-
- private static ProgramBlock rParseGenericProgramBlock( String in, Program prog, int id ) {
- String lin = in.substring( PARFOR_PB_BEGIN.length(),in.length()-PARFOR_PB_END.length());
- StringTokenizer st = new StringTokenizer(lin,COMPONENTS_DELIM);
-
- ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id);
-
- ProgramBlock pb = new ProgramBlock(prog);
- pb.setInstructions(inst);
-
- return pb;
- }
-
- private static ArrayList<Instruction> parseInstructions( String in, int id ) {
- ArrayList<Instruction> insts = new ArrayList<>();
- String lin = in.substring( PARFOR_INST_BEGIN.length(),in.length()-PARFOR_INST_END.length());
- StringTokenizer st = new StringTokenizer(lin, ELEMENT_DELIM);
- while(st.hasMoreTokens()) {
- //Note that at this point only CP instructions and External function instruction can occur
- String instStr = st.nextToken();
- try {
- Instruction tmpinst = CPInstructionParser.parseSingleInstruction(instStr);
- tmpinst = saveReplaceThreadID(tmpinst, CP_ROOT_THREAD_ID, CP_CHILD_THREAD+id );
- insts.add( tmpinst );
- }
- catch(Exception ex) {
- throw new DMLRuntimeException("Failed to parse instruction: " + instStr, ex);
- }
- }
- return insts;
- }
-
- private static ArrayList<ResultVar> parseResultVariables(String in) {
- ArrayList<ResultVar> ret = new ArrayList<>();
- for(String var : parseStringArrayList(in)) {
- boolean accum = var.endsWith("+");
- ret.add(new ResultVar(accum ? var.substring(0, var.length()-1) : var, accum));
- }
- return ret;
- }
-
- private static HashMap<String,String> parseStringHashMap( String in ) {
- HashMap<String,String> vars = new HashMap<>();
- StringTokenizer st = new StringTokenizer(in,ELEMENT_DELIM);
- while( st.hasMoreTokens() ) {
- String lin = st.nextToken();
- int index = lin.indexOf( KEY_VALUE_DELIM );
- String tmp1 = lin.substring(0, index);
- String tmp2 = lin.substring(index + 1);
- vars.put(tmp1, tmp2);
- }
- return vars;
- }
-
- private static ArrayList<String> parseStringArrayList( String in )
- {
- ArrayList<String> vars = new ArrayList<>();
- StringTokenizer st = new StringTokenizer(in,ELEMENT_DELIM);
- while( st.hasMoreTokens() ) {
- String tmp = st.nextToken();
- vars.add(tmp);
- }
-
- return vars;
- }
-
- private static ArrayList<DataIdentifier> parseDataIdentifiers( String in )
- {
- ArrayList<DataIdentifier> vars = new ArrayList<>();
- StringTokenizer st = new StringTokenizer(in, ELEMENT_DELIM);
- while( st.hasMoreTokens() ) {
- String tmp = st.nextToken();
- DataIdentifier dat = parseDataIdentifier( tmp );
- vars.add(dat);
- }
-
- return vars;
- }
-
- private static DataIdentifier parseDataIdentifier( String in )
- {
- StringTokenizer st = new StringTokenizer(in, DATA_FIELD_DELIM);
- String name = st.nextToken();
- DataType dt = DataType.valueOf(st.nextToken());
- ValueType vt = ValueType.valueOf(st.nextToken());
-
- DataIdentifier dat = new DataIdentifier(name);
- dat.setDataType(dt);
- dat.setValueType(vt);
-
- return dat;
- }
-
- /**
- * NOTE: MRJobConfiguration cannot be used for the general case because program blocks and
- * related symbol tables can be hierarchically structured.
- *
- * @param in data object as string
- * @return array of objects
- */
- public static Object[] parseDataObject(String in) {
- Object[] ret = new Object[2];
-
- StringTokenizer st = new StringTokenizer(in, DATA_FIELD_DELIM );
- String name = st.nextToken();
- DataType datatype = DataType.valueOf( st.nextToken() );
- ValueType valuetype = ValueType.valueOf( st.nextToken() );
- String valString = st.hasMoreTokens() ? st.nextToken() : "";
- Data dat = null;
- switch( datatype )
- {
- case SCALAR:
- {
- switch ( valuetype )
- {
- case INT:
- dat = new IntObject(Long.parseLong(valString));
- break;
- case DOUBLE:
- dat = new DoubleObject(Double.parseDouble(valString));
- break;
- case BOOLEAN:
- dat = new BooleanObject(Boolean.parseBoolean(valString));
- break;
- case STRING:
- dat = new StringObject(valString);
- break;
- default:
- throw new DMLRuntimeException("Unable to parse valuetype "+valuetype);
- }
- break;
- }
- case MATRIX:
- {
- MatrixObject mo = new MatrixObject(valuetype,valString);
- long rows = Long.parseLong( st.nextToken() );
- long cols = Long.parseLong( st.nextToken() );
- int brows = Integer.parseInt( st.nextToken() );
- int bcols = Integer.parseInt( st.nextToken() );
- long nnz = Long.parseLong( st.nextToken() );
- InputInfo iin = InputInfo.stringToInputInfo( st.nextToken() );
- OutputInfo oin = OutputInfo.stringToOutputInfo( st.nextToken() );
- PartitionFormat partFormat = PartitionFormat.valueOf( st.nextToken() );
- UpdateType inplace = UpdateType.valueOf( st.nextToken() );
- MatrixCharacteristics mc = new MatrixCharacteristics(rows, cols, brows, bcols, nnz);
- MetaDataFormat md = new MetaDataFormat( mc, oin, iin );
- mo.setMetaData( md );
- if( partFormat._dpf != PDataPartitionFormat.NONE )
- mo.setPartitioned( partFormat._dpf, partFormat._N );
- mo.setUpdateType(inplace);
- dat = mo;
- break;
- }
- default:
- throw new DMLRuntimeException("Unable to parse datatype "+datatype);
- }
-
- ret[0] = name;
- ret[1] = dat;
- return ret;
- }
-
- private static ExecutionContext parseExecutionContext(String in, Program prog)
- {
- ExecutionContext ec = null;
-
- String lin = in.substring(PARFOR_EC_BEGIN.length(),in.length()-PARFOR_EC_END.length()).trim();
-
- if( !lin.equals( EMPTY ) )
- {
- LocalVariableMap vars = parseVariables(lin);
- ec = ExecutionContextFactory.createContext( false, prog );
- ec.setVariables(vars);
- }
-
- return ec;
- }
-
- private static void parseAndSetAdditionalConfigurations(String conf) {
- String[] statsFlag = conf.split("=");
- DMLScript.STATISTICS = Boolean.parseBoolean(statsFlag[1]);
- }
-
- //////////
- // CUSTOM SAFE LITERAL REPLACEMENT
-
- /**
- * In-place replacement of thread ids in filenames, functions names etc
- *
- * @param inst instruction
- * @param pattern ?
- * @param replacement string replacement
- * @return instruction
- */
- private static Instruction saveReplaceThreadID( Instruction inst, String pattern, String replacement )
- {
- //currently known, relevant instructions: createvar, rand, seq, extfunct,
- if( inst instanceof MRJobInstruction )
- {
- //update dims file, and internal string representations of rand/seq instructions
- MRJobInstruction mrinst = (MRJobInstruction)inst;
- mrinst.updateInstructionThreadID(pattern, replacement);
- }
- else if ( inst instanceof VariableCPInstruction ) //createvar, setfilename
- {
- //update in-memory representation
- inst.updateInstructionThreadID(pattern, replacement);
- }
- //NOTE> //Rand, seq in CP not required
- //else if( inst.toString().contains(pattern) )
- // throw new DMLRuntimeException( "DEBUG: Missed thread id replacement: "+inst );
-
- return inst;
- }
-
- public static String saveReplaceFilenameThreadID(String fname, String pattern, String replace)
- {
- //save replace necessary in order to account for the possibility that read variables have our prefix in the absolute path
- //replace the last match only, because (1) we have at most one _t0 and (2) always concatenated to the end.
- int pos = fname.lastIndexOf(pattern);
- if( pos < 0 )
- return fname;
- return fname.substring(0, pos) + replace + fname.substring(pos+pattern.length());
- }
-
-
- //////////
- // CUSTOM HIERARCHICAL TOKENIZER
-
-
- /**
- * Custom StringTokenizer for splitting strings of hierarchies. The basic idea is to
- * search for delim-Strings on the same hierarchy level, while delims of lower hierarchy
- * levels are skipped.
- *
- */
- private static class HierarchyAwareStringTokenizer //extends StringTokenizer
- {
- private String _str = null;
- private String _del = null;
- private int _off = -1;
-
- public HierarchyAwareStringTokenizer( String in, String delim )
- {
- //super(in);
- _str = in;
- _del = delim;
- _off = delim.length();
- }
-
- public boolean hasMoreTokens()
- {
- return (_str.length() > 0);
- }
-
- public String nextToken()
- {
- int nextDelim = determineNextSameLevelIndexOf(_str, _del);
- String token = null;
- if(nextDelim < 0)
- {
- nextDelim = _str.length();
- _off = 0;
- }
- token = _str.substring(0,nextDelim);
- _str = _str.substring( nextDelim + _off );
- return token;
- }
-
- private static int determineNextSameLevelIndexOf( String data, String pattern )
- {
- String tmpdata = data;
- int index = 0;
- int count = 0;
- int off=0,i1,i2,i3,min;
-
- while(true)
- {
- i1 = tmpdata.indexOf(pattern);
- i2 = tmpdata.indexOf(LEVELIN);
- i3 = tmpdata.indexOf(LEVELOUT);
-
- if( i1 < 0 ) return i1; //no pattern found at all
-
- min = i1; //min >= 0 by definition
- if( i2 >= 0 ) min = Math.min(min, i2);
- if( i3 >= 0 ) min = Math.min(min, i3);
-
- //stack maintenance
- if( i1 == min && count == 0 )
- return index+i1;
- else if( i2 == min )
- {
- count++;
- off = LEVELIN.length();
- }
- else if( i3 == min )
- {
- count--;
- off = LEVELOUT.length();
- }
-
- //prune investigated string
- index += min+off;
- tmpdata = tmpdata.substring(min+off);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
index b51df84..1f25032 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
@@ -57,6 +57,7 @@ import org.apache.sysml.runtime.matrix.data.OutputInfo;
import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.util.MapReduceTool;
+import org.apache.sysml.runtime.util.ProgramConverter;
import org.apache.sysml.utils.Statistics;
import org.apache.sysml.yarn.DMLAppMasterUtils;
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index 885b2b7..66b4283 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -37,14 +37,13 @@ import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysml.runtime.controlprogram.parfor.Task.TaskType;
import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
import org.apache.sysml.runtime.instructions.cp.IntObject;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.OutputInfo;
-import org.apache.sysml.runtime.util.LocalFileUtils;
+import org.apache.sysml.runtime.util.ProgramConverter;
import scala.Tuple2;
@@ -151,23 +150,9 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
_numTasks = 0;
_numIters = 0;
- //init and register-cleanup of buffer pool (in parfor spark, multiple tasks might
- //share the process-local, i.e., per executor, buffer pool; hence we synchronize
- //the initialization and immediately register the created directory for cleanup
- //on process exit, i.e., executor exit, including any files created in the future.
- synchronized( CacheableData.class ) {
- if( !CacheableData.isCachingActive() && !InfrastructureAnalyzer.isLocalMode() ) {
- //create id, executor working dir, and cache dir
- String uuid = IDHandler.createDistributedUniqueID();
- LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
- CacheableData.initCaching( uuid ); //incl activation and cache dir creation
- CacheableData.cacheEvictionLocalFilePrefix =
- CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID;
- //register entire working dir for delete on shutdown
- RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown();
- }
- }
-
+ //setup the buffer pool
+ RemoteParForUtils.setupBufferPool(_workerID);
+
//ensure that resultvar files are not removed
super.pinResultVariables();
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
index 6086d25..3f235cc 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
@@ -48,6 +48,7 @@ import org.apache.sysml.runtime.matrix.data.OutputInfo;
import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.util.LocalFileUtils;
+import org.apache.sysml.runtime.util.ProgramConverter;
import org.apache.sysml.utils.Statistics;
public class RemoteDPParWorkerReducer extends ParWorker
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
index baec5a2..81a5e65 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
@@ -54,6 +54,7 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.util.MapReduceTool;
+import org.apache.sysml.runtime.util.ProgramConverter;
import org.apache.sysml.utils.Statistics;
import org.apache.sysml.yarn.DMLAppMasterUtils;
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
index ab29148..fd0b9eb 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
@@ -37,8 +37,7 @@ import org.apache.sysml.runtime.codegen.CodegenUtils;
import org.apache.sysml.runtime.controlprogram.caching.CacheBlock;
import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
-import org.apache.sysml.runtime.util.LocalFileUtils;
+import org.apache.sysml.runtime.util.ProgramConverter;
import org.apache.sysml.runtime.util.UtilFunctions;
import scala.Tuple2;
@@ -128,23 +127,9 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
.map(v -> v._name).collect(Collectors.toList()), _ec.getVarListPartitioned());
reuseVars.reuseVariables(_jobid, _ec.getVariables(), blacklist, _brInputs, _cleanCache);
- //init and register-cleanup of buffer pool (in parfor spark, multiple tasks might
- //share the process-local, i.e., per executor, buffer pool; hence we synchronize
- //the initialization and immediately register the created directory for cleanup
- //on process exit, i.e., executor exit, including any files created in the future.
- synchronized( CacheableData.class ) {
- if( !CacheableData.isCachingActive() && !InfrastructureAnalyzer.isLocalMode() ) {
- //create id, executor working dir, and cache dir
- String uuid = IDHandler.createDistributedUniqueID();
- LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
- CacheableData.initCaching( uuid ); //incl activation and cache dir creation
- CacheableData.cacheEvictionLocalFilePrefix =
- CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID;
- //register entire working dir for delete on shutdown
- RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown();
- }
- }
-
+ //setup the buffer pool
+ RemoteParForUtils.setupBufferPool(_workerID);
+
//ensure that resultvar files are not removed
super.pinResultVariables();
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
index fb2adc2..2cf35c7 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
@@ -45,8 +45,10 @@ import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysml.runtime.controlprogram.parfor.stat.Stat;
+import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
import org.apache.sysml.runtime.instructions.cp.Data;
import org.apache.sysml.runtime.util.LocalFileUtils;
+import org.apache.sysml.runtime.util.ProgramConverter;
import org.apache.sysml.utils.Statistics;
/**
@@ -237,6 +239,29 @@ public class RemoteParForUtils
//create return array
return tmp.values().toArray(new LocalVariableMap[0]);
}
+
+ /**
+ * Init and register-cleanup of buffer pool
+ * @param workerID worker id
+ * @throws IOException exception
+ */
+ public static void setupBufferPool(long workerID) throws IOException {
+ //init and register-cleanup of buffer pool (in spark, multiple tasks might
+ //share the process-local, i.e., per executor, buffer pool; hence we synchronize
+ //the initialization and immediately register the created directory for cleanup
+ //on process exit, i.e., executor exit, including any files created in the future.
+ synchronized(CacheableData.class) {
+ if (!CacheableData.isCachingActive() && !InfrastructureAnalyzer.isLocalMode()) {
+ //create id, executor working dir, and cache dir
+ String uuid = IDHandler.createDistributedUniqueID();
+ LocalFileUtils.createWorkingDirectoryWithUUID(uuid);
+ CacheableData.initCaching(uuid); //incl activation and cache dir creation
+ CacheableData.cacheEvictionLocalFilePrefix = CacheableData.cacheEvictionLocalFilePrefix + "_" + workerID;
+ //register entire working dir for delete on shutdown
+ RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown();
+ }
+ }
+ }
/**
* Task to be registered as shutdown hook in order to delete the
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java
index 7029061..0bdb92e 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java
@@ -42,6 +42,7 @@ import org.apache.sysml.runtime.instructions.cp.Data;
import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.util.LocalFileUtils;
+import org.apache.sysml.runtime.util.ProgramConverter;
import org.apache.sysml.utils.Statistics;
/**
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index ed3b758..d345d01 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -81,7 +81,7 @@ import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
+import org.apache.sysml.runtime.util.ProgramConverter;
import org.apache.sysml.runtime.controlprogram.parfor.ResultMergeLocalFile;
import org.apache.sysml.runtime.controlprogram.parfor.opt.CostEstimator.ExcludeType;
import org.apache.sysml.runtime.controlprogram.parfor.opt.CostEstimator.TestMeasure;
@@ -438,7 +438,7 @@ public class OptimizerRuleBased extends Optimizer
_numEvaluatedPlans++;
LOG.debug(getOptMode()+" OPT: rewrite 'set data partitioner' - result="+pdp.toString()+
- " ("+ProgramConverter.serializeStringCollection(partitionedMatrices.keySet())+")" );
+ " ("+Arrays.toString(partitionedMatrices.keySet().toArray())+")" );
return blockwise;
}
@@ -1809,8 +1809,8 @@ public class OptimizerRuleBased extends Optimizer
pfpb.setRuntimePiggybacking(apply);
_numEvaluatedPlans++;
- LOG.debug(getOptMode()+" OPT: rewrite 'enable runtime piggybacking' - result="+apply+
- " ("+ProgramConverter.serializeStringCollection(sharedVars)+")" );
+ LOG.debug(getOptMode()+" OPT: rewrite 'enable runtime piggybacking' - result="
+ +apply+" ("+Arrays.toString(sharedVars.toArray())+")" );
}
protected boolean rHasSharedMRInput( OptNode n, Set<String> inputVars, Set<String> partitionedVars, HashSet<String> sharedVars )
@@ -1931,8 +1931,8 @@ public class OptimizerRuleBased extends Optimizer
}
_numEvaluatedPlans++;
- LOG.debug(getOptMode()+" OPT: rewrite 'inject spark input repartition' - result="+ret.size()+
- " ("+ProgramConverter.serializeStringCollection(ret)+")" );
+ LOG.debug(getOptMode()+" OPT: rewrite 'inject spark input repartition' - result="
+ +ret.size()+" ("+Arrays.toString(ret.toArray())+")" );
}
private void rCollectZipmmPartitioningCandidates( OptNode n, HashSet<String> cand )
@@ -2008,8 +2008,8 @@ public class OptimizerRuleBased extends Optimizer
}
_numEvaluatedPlans++;
- LOG.debug(getOptMode()+" OPT: rewrite 'set spark eager rdd caching' - result="+ret.size()+
- " ("+ProgramConverter.serializeStringCollection(ret)+")" );
+ LOG.debug(getOptMode()+" OPT: rewrite 'set spark eager rdd caching' - result="
+ +ret.size()+" ("+Arrays.toString(ret.toArray())+")" );
}
///////
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/instructions/MRJobInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/MRJobInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/MRJobInstruction.java
index 65e9d3a..9d6f133 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/MRJobInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/MRJobInstruction.java
@@ -36,7 +36,7 @@ import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartition
import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
+import org.apache.sysml.runtime.util.ProgramConverter;
import org.apache.sysml.runtime.instructions.cp.Data;
import org.apache.sysml.runtime.matrix.JobReturn;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
[4/4] systemml git commit: [SYSTEMML-2419] Paramserv spark function
shipping and worker setup
Posted by mb...@apache.org.
[SYSTEMML-2419] Paramserv spark function shipping and worker setup
Closes #799.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/cffefca3
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/cffefca3
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/cffefca3
Branch: refs/heads/master
Commit: cffefca30e89ce249c3030d23123c1b3aba1757a
Parents: 614adec
Author: EdgarLGB <gu...@atos.net>
Authored: Sun Jul 15 22:01:54 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sun Jul 15 22:02:04 2018 -0700
----------------------------------------------------------------------
.../java/org/apache/sysml/api/DMLScript.java | 4 +-
.../java/org/apache/sysml/hops/DataGenOp.java | 5 +-
.../org/apache/sysml/hops/OptimizerUtils.java | 3 +-
.../apache/sysml/hops/recompile/Recompiler.java | 5 +-
src/main/java/org/apache/sysml/lops/Lop.java | 2 +
.../java/org/apache/sysml/lops/compile/Dag.java | 3 +-
.../org/apache/sysml/parser/DMLTranslator.java | 3 +-
.../controlprogram/LocalVariableMap.java | 2 +-
.../controlprogram/ParForProgramBlock.java | 2 +-
.../context/ExecutionContext.java | 4 +
.../controlprogram/paramserv/PSWorker.java | 30 +-
.../paramserv/ParamservUtils.java | 35 +-
.../paramserv/spark/SparkPSBody.java | 46 +
.../paramserv/spark/SparkPSWorker.java | 43 +-
.../controlprogram/parfor/ProgramConverter.java | 1866 ------------------
.../controlprogram/parfor/RemoteDPParForMR.java | 1 +
.../parfor/RemoteDPParForSparkWorker.java | 23 +-
.../parfor/RemoteDPParWorkerReducer.java | 1 +
.../controlprogram/parfor/RemoteParForMR.java | 1 +
.../parfor/RemoteParForSparkWorker.java | 23 +-
.../parfor/RemoteParForUtils.java | 25 +
.../parfor/RemoteParWorkerMapper.java | 1 +
.../parfor/opt/OptimizerRuleBased.java | 16 +-
.../runtime/instructions/MRJobInstruction.java | 2 +-
.../cp/ParamservBuiltinCPInstruction.java | 34 +-
.../instructions/cp/VariableCPInstruction.java | 2 +-
.../sysml/runtime/util/ProgramConverter.java | 1838 +++++++++++++++++
.../paramserv/ParamservLocalNNTest.java | 93 +
.../functions/paramserv/ParamservNNTest.java | 94 -
.../paramserv/ParamservSparkNNTest.java | 47 +
.../functions/paramserv/SerializationTest.java | 80 +
.../parfor/ParForAdversarialLiteralsTest.java | 13 +-
.../paramserv/mnist_lenet_paramserv.dml | 4 +-
.../paramserv/paramserv-nn-asp-batch.dml | 2 +-
.../paramserv/paramserv-nn-asp-epoch.dml | 2 +-
.../paramserv/paramserv-nn-bsp-batch-dc.dml | 2 +-
.../paramserv/paramserv-nn-bsp-batch-dr.dml | 2 +-
.../paramserv/paramserv-nn-bsp-batch-drr.dml | 2 +-
.../paramserv/paramserv-nn-bsp-batch-or.dml | 2 +-
.../paramserv/paramserv-nn-bsp-epoch.dml | 2 +-
.../paramserv-spark-nn-bsp-batch-dc.dml | 53 +
.../functions/paramserv/ZPackageSuite.java | 3 +-
42 files changed, 2332 insertions(+), 2089 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/api/DMLScript.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/DMLScript.java b/src/main/java/org/apache/sysml/api/DMLScript.java
index 227c14b..9c8a3eb 100644
--- a/src/main/java/org/apache/sysml/api/DMLScript.java
+++ b/src/main/java/org/apache/sysml/api/DMLScript.java
@@ -28,6 +28,7 @@ import java.io.InputStreamReader;
import java.net.URI;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@@ -76,7 +77,6 @@ import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
import org.apache.sysml.runtime.instructions.gpu.context.GPUContextPool;
@@ -853,7 +853,7 @@ public class DMLScript
LOG.debug("SystemML security check: "
+ "local.user.name = " + userName + ", "
- + "local.user.groups = " + ProgramConverter.serializeStringCollection(groupNames) + ", "
+ + "local.user.groups = " + Arrays.toString(groupNames.toArray()) + ", "
+ MRConfigurationNames.MR_JOBTRACKER_ADDRESS + " = " + job.get(MRConfigurationNames.MR_JOBTRACKER_ADDRESS) + ", "
+ MRConfigurationNames.MR_TASKTRACKER_TASKCONTROLLER + " = " + taskController + ","
+ MRConfigurationNames.MR_TASKTRACKER_GROUP + " = " + ttGroupName + ", "
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/hops/DataGenOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/DataGenOp.java b/src/main/java/org/apache/sysml/hops/DataGenOp.java
index 9e59235..b374185 100644
--- a/src/main/java/org/apache/sysml/hops/DataGenOp.java
+++ b/src/main/java/org/apache/sysml/hops/DataGenOp.java
@@ -34,7 +34,6 @@ import org.apache.sysml.parser.DataExpression;
import org.apache.sysml.parser.Expression.DataType;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.parser.Statement;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
import org.apache.sysml.runtime.util.UtilFunctions;
/**
@@ -108,8 +107,8 @@ public class DataGenOp extends MultiThreadedHop
//generate base dir
String scratch = ConfigurationManager.getScratchSpace();
- _baseDir = scratch + Lop.FILE_SEPARATOR + Lop.PROCESS_PREFIX + DMLScript.getUUID() + Lop.FILE_SEPARATOR +
- Lop.FILE_SEPARATOR + ProgramConverter.CP_ROOT_THREAD_ID + Lop.FILE_SEPARATOR;
+ _baseDir = scratch + Lop.FILE_SEPARATOR + Lop.PROCESS_PREFIX + DMLScript.getUUID() + Lop.FILE_SEPARATOR
+ + Lop.FILE_SEPARATOR + Lop.CP_ROOT_THREAD_ID + Lop.FILE_SEPARATOR;
//compute unknown dims and nnz
refreshSizeInformation();
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
index 3f1a8cf..fb83df0 100644
--- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
@@ -46,7 +46,6 @@ import org.apache.sysml.runtime.controlprogram.ForProgramBlock;
import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
import org.apache.sysml.runtime.controlprogram.caching.LazyWriteBuffer;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysml.runtime.functionobjects.IntegerDivide;
import org.apache.sysml.runtime.functionobjects.Modulus;
@@ -927,7 +926,7 @@ public class OptimizerUtils
public static String getUniqueTempFileName() {
return ConfigurationManager.getScratchSpace()
+ Lop.FILE_SEPARATOR + Lop.PROCESS_PREFIX + DMLScript.getUUID()
- + Lop.FILE_SEPARATOR + ProgramConverter.CP_ROOT_THREAD_ID + Lop.FILE_SEPARATOR
+ + Lop.FILE_SEPARATOR + Lop.CP_ROOT_THREAD_ID + Lop.FILE_SEPARATOR
+ Dag.getNextUniqueFilenameSuffix();
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
index d5b0043..4175eac 100644
--- a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
+++ b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
@@ -82,7 +82,7 @@ import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
+import org.apache.sysml.runtime.util.ProgramConverter;
import org.apache.sysml.runtime.controlprogram.parfor.opt.OptTreeConverter;
import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysml.runtime.instructions.Instruction;
@@ -589,8 +589,7 @@ public class Recompiler
//update function names
if( hop instanceof FunctionOp && ((FunctionOp)hop).getFunctionType() != FunctionType.MULTIRETURN_BUILTIN) {
FunctionOp fop = (FunctionOp) hop;
- fop.setFunctionName( fop.getFunctionName() +
- ProgramConverter.CP_CHILD_THREAD + pid);
+ fop.setFunctionName( fop.getFunctionName() + Lop.CP_CHILD_THREAD + pid);
}
if( hop.getInput() != null )
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/lops/Lop.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/Lop.java b/src/main/java/org/apache/sysml/lops/Lop.java
index ff2d515..9e81496 100644
--- a/src/main/java/org/apache/sysml/lops/Lop.java
+++ b/src/main/java/org/apache/sysml/lops/Lop.java
@@ -77,6 +77,8 @@ public abstract class Lop
public static final String FILE_SEPARATOR = "/";
public static final String PROCESS_PREFIX = "_p";
+ public static final String CP_ROOT_THREAD_ID = "_t0";
+ public static final String CP_CHILD_THREAD = "_t";
//special delimiters w/ extended ASCII characters to avoid collisions
public static final String INSTRUCTION_DELIMITOR = "\u2021";
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/lops/compile/Dag.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/compile/Dag.java b/src/main/java/org/apache/sysml/lops/compile/Dag.java
index b1d6865..452f030 100644
--- a/src/main/java/org/apache/sysml/lops/compile/Dag.java
+++ b/src/main/java/org/apache/sysml/lops/compile/Dag.java
@@ -69,7 +69,6 @@ import org.apache.sysml.parser.Expression;
import org.apache.sysml.parser.Expression.DataType;
import org.apache.sysml.parser.StatementBlock;
import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence;
import org.apache.sysml.runtime.instructions.CPInstructionParser;
import org.apache.sysml.runtime.instructions.Instruction;
@@ -198,7 +197,7 @@ public class Dag<N extends Lop>
scratchFilePath = scratch + Lop.FILE_SEPARATOR
+ Lop.PROCESS_PREFIX + DMLScript.getUUID()
+ Lop.FILE_SEPARATOR + Lop.FILE_SEPARATOR
- + ProgramConverter.CP_ROOT_THREAD_ID + Lop.FILE_SEPARATOR;
+ + Lop.CP_ROOT_THREAD_ID + Lop.FILE_SEPARATOR;
}
return scratchFilePath;
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/parser/DMLTranslator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/DMLTranslator.java b/src/main/java/org/apache/sysml/parser/DMLTranslator.java
index 089edce..b9e5f9d 100644
--- a/src/main/java/org/apache/sysml/parser/DMLTranslator.java
+++ b/src/main/java/org/apache/sysml/parser/DMLTranslator.java
@@ -88,7 +88,6 @@ import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
import org.apache.sysml.runtime.controlprogram.Program;
import org.apache.sysml.runtime.controlprogram.ProgramBlock;
import org.apache.sysml.runtime.controlprogram.WhileProgramBlock;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
import org.apache.sysml.runtime.instructions.Instruction;
@@ -613,7 +612,7 @@ public class DMLTranslator
buff.append(Lop.PROCESS_PREFIX);
buff.append(DMLScript.getUUID());
buff.append(Lop.FILE_SEPARATOR);
- buff.append(ProgramConverter.CP_ROOT_THREAD_ID);
+ buff.append(Lop.CP_ROOT_THREAD_ID);
buff.append(Lop.FILE_SEPARATOR);
buff.append("PackageSupport");
buff.append(Lop.FILE_SEPARATOR);
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java b/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
index 2081aae..62ae3d0 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
@@ -28,7 +28,7 @@ import java.util.StringTokenizer;
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
+import org.apache.sysml.runtime.util.ProgramConverter;
import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence;
import org.apache.sysml.runtime.instructions.cp.Data;
import org.apache.sysml.utils.Statistics;
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index 83e9cde..ba490f3 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -65,7 +65,7 @@ import org.apache.sysml.runtime.controlprogram.parfor.DataPartitionerRemoteSpark
import org.apache.sysml.runtime.controlprogram.parfor.LocalParWorker;
import org.apache.sysml.runtime.controlprogram.parfor.LocalTaskQueue;
import org.apache.sysml.runtime.controlprogram.parfor.ParForBody;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
+import org.apache.sysml.runtime.util.ProgramConverter;
import org.apache.sysml.runtime.controlprogram.parfor.RemoteDPParForMR;
import org.apache.sysml.runtime.controlprogram.parfor.RemoteDPParForSpark;
import org.apache.sysml.runtime.controlprogram.parfor.RemoteParForJobReturn;
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
index d0d0b08..2e0addf 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
@@ -101,6 +101,10 @@ public class ExecutionContext {
public Program getProgram(){
return _prog;
}
+
+ public void setProgram(Program prog) {
+ _prog = prog;
+ }
public LocalVariableMap getVariables() {
return _variables;
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/PSWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/PSWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/PSWorker.java
index a76dfec..1ab5f5e 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/PSWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/PSWorker.java
@@ -34,23 +34,25 @@ import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysml.runtime.instructions.cp.CPOperand;
import org.apache.sysml.runtime.instructions.cp.FunctionCallCPInstruction;
-@SuppressWarnings("unused")
public abstract class PSWorker {
-
- protected final int _workerID;
- protected final int _epochs;
- protected final long _batchSize;
- protected final ExecutionContext _ec;
- protected final ParamServer _ps;
- protected final DataIdentifier _output;
- protected final FunctionCallCPInstruction _inst;
+ protected int _workerID;
+ protected int _epochs;
+ protected long _batchSize;
+ protected ExecutionContext _ec;
+ protected ParamServer _ps;
+ protected DataIdentifier _output;
+ protected FunctionCallCPInstruction _inst;
protected MatrixObject _features;
protected MatrixObject _labels;
-
- private MatrixObject _valFeatures;
- private MatrixObject _valLabels;
- private final String _updFunc;
- protected final Statement.PSFrequency _freq;
+
+ protected MatrixObject _valFeatures;
+ protected MatrixObject _valLabels;
+ protected String _updFunc;
+ protected Statement.PSFrequency _freq;
+
+ protected PSWorker() {
+
+ }
protected PSWorker(int workerID, String updFunc, Statement.PSFrequency freq, int epochs, long batchSize,
MatrixObject valFeatures, MatrixObject valLabels, ExecutionContext ec, ParamServer ps) {
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java
index ec6b6b2..ee15709 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java
@@ -57,7 +57,6 @@ import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysml.runtime.controlprogram.paramserv.spark.DataPartitionerSparkAggregator;
import org.apache.sysml.runtime.controlprogram.paramserv.spark.DataPartitionerSparkMapper;
-import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
import org.apache.sysml.runtime.functionobjects.Plus;
import org.apache.sysml.runtime.instructions.cp.Data;
import org.apache.sysml.runtime.instructions.cp.ListObject;
@@ -68,6 +67,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
import org.apache.sysml.runtime.matrix.data.OutputInfo;
import org.apache.sysml.runtime.matrix.operators.BinaryOperator;
+import org.apache.sysml.runtime.util.ProgramConverter;
import scala.Tuple2;
@@ -175,9 +175,8 @@ public class ParamservUtils {
new String[]{ns, name} : new String[]{ns, name};
}
- public static List<ExecutionContext> createExecutionContexts(ExecutionContext ec, LocalVariableMap varsMap,
- String updFunc, String aggFunc, int workerNum, int k) {
-
+ public static ExecutionContext createExecutionContext(ExecutionContext ec, LocalVariableMap varsMap, String updFunc,
+ String aggFunc, int k) {
FunctionProgramBlock updPB = getFunctionBlock(ec, updFunc);
FunctionProgramBlock aggPB = getFunctionBlock(ec, aggFunc);
@@ -188,27 +187,21 @@ public class ParamservUtils {
// 2. Recompile the imported function blocks
prog.getFunctionProgramBlocks().forEach((fname, fvalue) -> recompileProgramBlocks(k, fvalue.getChildBlocks()));
- // 3. Copy function for workers
- List<ExecutionContext> workerECs = IntStream.range(0, workerNum)
- .mapToObj(i -> {
- FunctionProgramBlock newUpdFunc = copyFunction(updFunc, updPB);
- FunctionProgramBlock newAggFunc = copyFunction(aggFunc, aggPB);
- Program newProg = new Program();
- putFunction(newProg, newUpdFunc);
- putFunction(newProg, newAggFunc);
- return ExecutionContextFactory.createContext(new LocalVariableMap(varsMap), newProg);
- })
- .collect(Collectors.toList());
-
- // 4. Copy function for agg service
+ // 3. Copy function
+ FunctionProgramBlock newUpdFunc = copyFunction(updFunc, updPB);
FunctionProgramBlock newAggFunc = copyFunction(aggFunc, aggPB);
Program newProg = new Program();
+ putFunction(newProg, newUpdFunc);
putFunction(newProg, newAggFunc);
- ExecutionContext aggEC = ExecutionContextFactory.createContext(new LocalVariableMap(varsMap), newProg);
+ return ExecutionContextFactory.createContext(new LocalVariableMap(varsMap), newProg);
+ }
- List<ExecutionContext> result = new ArrayList<>(workerECs);
- result.add(aggEC);
- return result;
+ public static List<ExecutionContext> copyExecutionContext(ExecutionContext ec, int num) {
+ return IntStream.range(0, num).mapToObj(i -> {
+ Program newProg = new Program();
+ ec.getProgram().getFunctionProgramBlocks().forEach((func, pb) -> putFunction(newProg, copyFunction(func, pb)));
+ return ExecutionContextFactory.createContext(new LocalVariableMap(ec.getVariables()), newProg);
+ }).collect(Collectors.toList());
}
private static FunctionProgramBlock copyFunction(String funcName, FunctionProgramBlock fpb) {
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSBody.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSBody.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSBody.java
new file mode 100644
index 0000000..ec10232
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSBody.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.paramserv.spark;
+
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
+
+/**
+ * Wrapper class containing all needed for launching spark remote worker
+ */
+public class SparkPSBody {
+
+ private ExecutionContext _ec;
+
+ public SparkPSBody() {
+
+ }
+
+ public SparkPSBody(ExecutionContext ec) {
+ this._ec = ec;
+ }
+
+ public ExecutionContext getEc() {
+ return _ec;
+ }
+
+ public void setEc(ExecutionContext ec) {
+ this._ec = ec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/systemml/blob/cffefca3/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSWorker.java
index 69da56c..466801f 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSWorker.java
@@ -19,30 +19,59 @@
package org.apache.sysml.runtime.controlprogram.paramserv.spark;
+import java.io.IOException;
import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.sysml.parser.Statement;
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysml.runtime.controlprogram.paramserv.ParamServer;
+import org.apache.sysml.runtime.codegen.CodegenUtils;
+import org.apache.sysml.runtime.controlprogram.paramserv.PSWorker;
+import org.apache.sysml.runtime.controlprogram.parfor.RemoteParForUtils;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.ProgramConverter;
import scala.Tuple2;
-public class SparkPSWorker implements VoidFunction<Tuple2<Integer, Tuple2<MatrixBlock, MatrixBlock>>>, Serializable {
+public class SparkPSWorker extends PSWorker implements VoidFunction<Tuple2<Integer, Tuple2<MatrixBlock, MatrixBlock>>>, Serializable {
private static final long serialVersionUID = -8674739573419648732L;
- public SparkPSWorker() {
+ private String _program;
+ private HashMap<String, byte[]> _clsMap;
+
+ protected SparkPSWorker() {
// No-args constructor used for deserialization
}
- public SparkPSWorker(String updFunc, Statement.PSFrequency freq, int epochs, long batchSize,
- MatrixObject valFeatures, MatrixObject valLabels, ExecutionContext ec, ParamServer ps) {
+ public SparkPSWorker(String updFunc, Statement.PSFrequency freq, int epochs, long batchSize, String program, HashMap<String, byte[]> clsMap) {
+ _updFunc = updFunc;
+ _freq = freq;
+ _epochs = epochs;
+ _batchSize = batchSize;
+ _program = program;
+ _clsMap = clsMap;
}
@Override
public void call(Tuple2<Integer, Tuple2<MatrixBlock, MatrixBlock>> input) throws Exception {
+ configureWorker(input);
+ }
+
+ private void configureWorker(Tuple2<Integer, Tuple2<MatrixBlock, MatrixBlock>> input) throws IOException {
+ _workerID = input._1;
+
+ // Initialize codegen class cache (before program parsing)
+ for (Map.Entry<String, byte[]> e : _clsMap.entrySet()) {
+ CodegenUtils.getClassSync(e.getKey(), e.getValue());
+ }
+
+ // Deserialize the body to initialize the execution context
+ SparkPSBody body = ProgramConverter.parseSparkPSBody(_program, _workerID);
+ _ec = body.getEc();
+
+ // Initialize the buffer pool and register it in the jvm shutdown hook in order to be cleanuped at the end
+ RemoteParForUtils.setupBufferPool(_workerID);
}
}