You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ja...@apache.org on 2018/06/28 07:04:23 UTC
[incubator-nemo] branch master updated: [NEMO-16] Implement
collection of data from executor to client (#56)
This is an automated email from the ASF dual-hosted git repository.
jangho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 3493fba [NEMO-16] Implement collection of data from executor to client (#56)
3493fba is described below
commit 3493fba3330eec76ddaac19e61a67de4258d3fc5
Author: Won Wook SONG <wo...@apache.org>
AuthorDate: Thu Jun 28 16:04:20 2018 +0900
[NEMO-16] Implement collection of data from executor to client (#56)
JIRA: [NEMO-16: Implement collection of data from executor to client](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-16)
**Major changes:**
- Gets rid of the file-writing and reading during the `collect` action of Spark. Instead, it brings the data from the executor through the driver, and then back to the client using the RPC implemented with [NEMO-103] (#45).
**Minor changes to note:**
- I've separated the MR Spark integration tests into a separate file, as the SparkJava integration tests file has been becoming too large and crowded.
- A number of typos have been fixed
**Tests for the changes:**
- I've created a test for the `ContextImpl`, which had no unit tests before.
**Other comments:**
- N/A
resolves [NEMO-16](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-16)
---
.../main/java/edu/snu/nemo/client/JobLauncher.java | 17 ++++++-
.../main/java/edu/snu/nemo/common/ContextImpl.java | 13 +++++
.../nemo/common/ir/vertex/transform/Transform.java | 13 +++++
.../java/edu/snu/nemo/common/ContextImplTest.java | 55 ++++++++++++++++++++++
.../frontend/spark/core/SparkFrontendUtils.java | 43 ++---------------
.../frontend/spark/transform/CollectTransform.java | 32 +++----------
.../frontend/spark/transform/ReduceTransform.java | 17 -------
.../spark/{SparkScalaITCase.java => MRJava.java} | 42 +++++++++++------
.../spark/{SparkJavaITCase.java => SparkJava.java} | 49 +------------------
.../{SparkScalaITCase.java => SparkScala.java} | 2 +-
.../common/message/ncs/NcsMessageEnvironment.java | 1 +
runtime/common/src/main/proto/ControlMessage.proto | 12 ++++-
.../main/java/edu/snu/nemo/driver/NemoDriver.java | 1 +
.../edu/snu/nemo/runtime/executor/Executor.java | 4 +-
.../nemo/runtime/executor/task/TaskExecutor.java | 18 ++++++-
.../executor/datatransfer/DataTransferTest.java | 6 ++-
.../runtime/executor/task/TaskExecutorTest.java | 14 ++++--
.../edu/snu/nemo/runtime/master}/ClientRPC.java | 2 +-
.../edu/snu/nemo/runtime/master/RuntimeMaster.java | 10 ++++
.../edu/snu/nemo/client/ClientDriverRPCTest.java | 2 +-
20 files changed, 193 insertions(+), 160 deletions(-)
diff --git a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
index 5c330d7..b7cc03e 100644
--- a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
+++ b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
@@ -47,7 +47,9 @@ import java.lang.reflect.Modifier;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Base64;
+import java.util.List;
/**
* Job launcher.
@@ -60,6 +62,7 @@ public final class JobLauncher {
private static Configuration deployModeConf = null;
private static Configuration builtJobConf = null;
private static String serializedDAG;
+ private static List<?> collectedData = new ArrayList<>();
/**
* private constructor.
@@ -81,7 +84,10 @@ public final class JobLauncher {
.registerHandler(ControlMessage.DriverToClientMessageType.ResourceReady, event ->
driverRPCServer.send(ControlMessage.ClientToDriverMessage.newBuilder()
.setType(ControlMessage.ClientToDriverMessageType.LaunchDAG)
- .setLaunchDAG(ControlMessage.LaunchDAGMessage.newBuilder().setDag(serializedDAG).build()).build()))
+ .setLaunchDAG(ControlMessage.LaunchDAGMessage.newBuilder().setDag(serializedDAG).build())
+ .build()))
+ .registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, message -> collectedData.addAll(
+ SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData()))))
.run();
// Get Job and Driver Confs
@@ -299,4 +305,13 @@ public final class JobLauncher {
public static Configuration getBuiltJobConf() {
return builtJobConf;
}
+
+ /**
+ * Get the collected data.
+ *
+ * @return the collected data.
+ */
+ public static <T> List<T> getCollectedData() {
+ return (List<T>) collectedData;
+ }
}
diff --git a/common/src/main/java/edu/snu/nemo/common/ContextImpl.java b/common/src/main/java/edu/snu/nemo/common/ContextImpl.java
index 56d1b4b..823090c 100644
--- a/common/src/main/java/edu/snu/nemo/common/ContextImpl.java
+++ b/common/src/main/java/edu/snu/nemo/common/ContextImpl.java
@@ -18,12 +18,14 @@ package edu.snu.nemo.common;
import edu.snu.nemo.common.ir.vertex.transform.Transform;
import java.util.Map;
+import java.util.Optional;
/**
* Transform Context Implementation.
*/
public final class ContextImpl implements Transform.Context {
private final Map sideInputs;
+ private String data;
/**
* Constructor of Context Implementation.
@@ -31,10 +33,21 @@ public final class ContextImpl implements Transform.Context {
*/
public ContextImpl(final Map sideInputs) {
this.sideInputs = sideInputs;
+ this.data = null;
}
@Override
public Map getSideInputs() {
return this.sideInputs;
}
+
+ @Override
+ public void setSerializedData(final String serializedData) {
+ this.data = serializedData;
+ }
+
+ @Override
+ public Optional<String> getSerializedData() {
+ return Optional.ofNullable(this.data);
+ }
}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java
index 10d0bde..448f76a 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java
@@ -18,6 +18,7 @@ package edu.snu.nemo.common.ir.vertex.transform;
import edu.snu.nemo.common.ir.OutputCollector;
import java.io.Serializable;
import java.util.Map;
+import java.util.Optional;
/**
* Interface for specifying 'What' to do with data.
@@ -60,5 +61,17 @@ public interface Transform<I, O> extends Serializable {
* @return sideInputs.
*/
Map getSideInputs();
+
+ /**
+ * Put serialized data to send to the executor.
+ * @param serializedData the serialized data.
+ */
+ void setSerializedData(String serializedData);
+
+ /**
+ * Retrieve the serialized data on the executor.
+ * @return the serialized data.
+ */
+ Optional<String> getSerializedData();
}
}
diff --git a/common/src/test/java/edu/snu/nemo/common/ContextImplTest.java b/common/src/test/java/edu/snu/nemo/common/ContextImplTest.java
new file mode 100644
index 0000000..5a89d78
--- /dev/null
+++ b/common/src/test/java/edu/snu/nemo/common/ContextImplTest.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.snu.nemo.common;
+
+import edu.snu.nemo.common.ir.vertex.transform.Transform;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests {@link ContextImpl}.
+ */
+public class ContextImplTest {
+ private Transform.Context context;
+ private final Map sideInputs = new HashMap();
+
+ @Before
+ public void setUp() {
+ sideInputs.put("a", "b");
+ this.context = new ContextImpl(sideInputs);
+ }
+
+ @Test
+ public void testContextImpl() {
+ assertEquals(this.sideInputs, this.context.getSideInputs());
+
+ final String sampleText = "sample_text";
+
+ assertFalse(this.context.getSerializedData().isPresent());
+
+ this.context.setSerializedData(sampleText);
+ assertTrue(this.context.getSerializedData().isPresent());
+ assertEquals(sampleText, this.context.getSerializedData().get());
+ }
+}
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
index 08731a4..33fedd1 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
@@ -44,13 +44,7 @@ import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.TraversableOnce;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.ObjectInputStream;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Stack;
+import java.util.*;
/**
* Utility class for RDDs.
@@ -95,11 +89,7 @@ public final class SparkFrontendUtils {
final Serializer serializer) {
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>(dag);
- // save result in a temporary file
- // TODO #16: Implement collection of data from executor to client
- final String resultFile = System.getProperty("user.dir") + "/collectresult";
-
- final IRVertex collectVertex = new OperatorVertex(new CollectTransform<>(resultFile));
+ final IRVertex collectVertex = new OperatorVertex(new CollectTransform<>());
builder.addVertex(collectVertex, loopVertexStack);
final IREdge newEdge = new IREdge(getEdgeCommunicationPattern(lastVertex, collectVertex),
@@ -112,34 +102,7 @@ public final class SparkFrontendUtils {
// launch DAG
JobLauncher.launchDAG(builder.build());
- // Retrieve result data from file.
- // TODO #16: Implement collection of data from executor to client
- try {
- final List<T> result = new ArrayList<>();
- Integer i = 0;
-
- // TODO #16: Implement collection of data from executor to client
- File file = new File(resultFile + i);
- while (file.exists()) {
- try (
- final FileInputStream fis = new FileInputStream(file);
- final ObjectInputStream dis = new ObjectInputStream(fis)
- ) {
- final int size = dis.readInt(); // Read the number of collected T recorded in CollectTransform.
- for (int j = 0; j < size; j++) {
- result.add((T) dis.readObject());
- }
- }
-
- // Delete temporary file
- if (file.delete()) {
- file = new File(resultFile + ++i);
- }
- }
- return result;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ return (List<T>) JobLauncher.getCollectedData();
}
/**
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java
index c1df3ba..2cbeb0a 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java
@@ -17,56 +17,38 @@ package edu.snu.nemo.compiler.frontend.spark.transform;
import edu.snu.nemo.common.ir.OutputCollector;
import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
+import org.apache.commons.lang3.SerializationUtils;
-import java.io.FileOutputStream;
-import java.io.ObjectOutputStream;
import java.util.ArrayList;
-import java.util.List;
+import java.util.Base64;
/**
* Collect transform.
* @param <T> type of data to collect.
*/
public final class CollectTransform<T> implements Transform<T, T> {
- private String filename;
- private final List<T> list;
+ private final ArrayList<T> list;
+ private Context ctxt;
/**
* Constructor.
- *
- * @param filename file to keep the result in.
*/
- public CollectTransform(final String filename) {
- this.filename = filename;
+ public CollectTransform() {
this.list = new ArrayList<>();
}
@Override
public void prepare(final Context context, final OutputCollector<T> oc) {
- this.filename = filename + JavaRDD.getResultId();
+ this.ctxt = context;
}
@Override
public void onData(final T element) {
- // Write result to a temporary file.
- // TODO #16: Implement collection of data from executor to client
list.add(element);
}
@Override
public void close() {
- try (
- final FileOutputStream fos = new FileOutputStream(filename);
- final ObjectOutputStream oos = new ObjectOutputStream(fos)
- ) {
- // Write the length of list at first. This is needed internally and must not shown in the collected result.
- oos.writeInt(list.size());
- for (final T t : list) {
- oos.writeObject(t);
- }
- } catch (Exception e) {
- throw new RuntimeException("Exception while file closing in CollectTransform " + e);
- }
+ ctxt.setSerializedData(Base64.getEncoder().encodeToString(SerializationUtils.serialize(list)));
}
}
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceTransform.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceTransform.java
index c7dace1..e880b72 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceTransform.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceTransform.java
@@ -15,16 +15,11 @@
*/
package edu.snu.nemo.compiler.frontend.spark.transform;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Output;
import edu.snu.nemo.common.ir.OutputCollector;
import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import javax.annotation.Nullable;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
import java.util.Iterator;
/**
@@ -36,7 +31,6 @@ public final class ReduceTransform<T> implements Transform<T, T> {
private final Function2<T, T, T> func;
private OutputCollector<T> outputCollector;
private T result;
- private String filename;
/**
* Constructor.
@@ -45,7 +39,6 @@ public final class ReduceTransform<T> implements Transform<T, T> {
public ReduceTransform(final Function2<T, T, T> func) {
this.func = func;
this.result = null;
- this.filename = filename + JavaRDD.getResultId();
}
@Override
@@ -98,15 +91,5 @@ public final class ReduceTransform<T> implements Transform<T, T> {
@Override
public void close() {
- // Write result to a temporary file.
- // TODO #16: Implement collection of data from executor to client.
- try {
- final Kryo kryo = new Kryo();
- final Output output = new Output(new FileOutputStream(filename));
- kryo.writeClassAndObject(output, result);
- output.close();
- } catch (FileNotFoundException e) {
- throw new RuntimeException(e);
- }
}
}
diff --git a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScalaITCase.java b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
similarity index 67%
copy from examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScalaITCase.java
copy to examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
index 5541e58..c4d178f 100644
--- a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScalaITCase.java
+++ b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package edu.snu.nemo.examples.spark;
import edu.snu.nemo.client.JobLauncher;
@@ -27,13 +28,13 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
/**
- * Test Spark programs with JobLauncher.
+ * Test MR Spark programs with JobLauncher.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(JobLauncher.class)
@PowerMockIgnore("javax.management.*")
-public final class SparkScalaITCase {
- private static final int TIMEOUT = 120000;
+public final class MRJava {
+ private static final int TIMEOUT = 180000;
private static ArgBuilder builder;
private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
private static final String executorResourceFileName = fileBasePath + "spark_sample_executor_resources.json";
@@ -45,29 +46,42 @@ public final class SparkScalaITCase {
}
@Test(timeout = TIMEOUT)
- public void testPi() throws Exception {
- final String numParallelism = "3";
+ public void testSparkWordCount() throws Exception {
+ final String inputFileName = "sample_input_wordcount_spark";
+ final String outputFileName = "sample_output_wordcount_spark";
+ final String testResourceFilename = "test_output_wordcount_spark";
+ final String inputFilePath = fileBasePath + inputFileName;
+ final String outputFilePath = fileBasePath + outputFileName;
JobLauncher.main(builder
- .addJobId(SparkPi.class.getSimpleName() + "_test")
- .addUserMain(SparkPi.class.getCanonicalName())
- .addUserArgs(numParallelism)
+ .addJobId(JavaWordCount.class.getSimpleName() + "_test")
+ .addUserMain(JavaWordCount.class.getCanonicalName())
+ .addUserArgs(inputFilePath, outputFilePath)
.addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
.build());
+
+ try {
+ ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, testResourceFilename);
+ } finally {
+ ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+ }
}
+ /* Temporary disabled because of Travis issue
@Test(timeout = TIMEOUT)
- public void testWordCount() throws Exception {
+ public void testSparkMapReduce() throws Exception {
final String inputFileName = "sample_input_wordcount_spark";
- final String outputFileName = "sample_output_wordcount_spark";
+ final String outputFileName = "sample_output_mr";
final String testResourceFilename = "test_output_wordcount_spark";
final String inputFilePath = fileBasePath + inputFileName;
final String outputFilePath = fileBasePath + outputFileName;
+ final String parallelism = "2";
+ final String runOnYarn = "false";
JobLauncher.main(builder
- .addJobId(SparkWordCount.class.getSimpleName() + "_test")
- .addUserMain(SparkWordCount.class.getCanonicalName())
- .addUserArgs(inputFilePath, outputFilePath)
+ .addJobId(JavaMapReduce.class.getSimpleName() + "_test")
+ .addUserMain(JavaMapReduce.class.getCanonicalName())
+ .addUserArgs(inputFilePath, outputFilePath, parallelism, runOnYarn)
.addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
.build());
@@ -76,5 +90,5 @@ public final class SparkScalaITCase {
} finally {
ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
}
- }
+ }*/
}
diff --git a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkJavaITCase.java b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkJava.java
similarity index 67%
rename from examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkJavaITCase.java
rename to examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkJava.java
index 8aa1d48..132e517 100644
--- a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkJavaITCase.java
+++ b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkJava.java
@@ -34,7 +34,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest(JobLauncher.class)
@PowerMockIgnore("javax.management.*")
-public final class SparkJavaITCase {
+public final class SparkJava {
private static final int TIMEOUT = 180000;
private static ArgBuilder builder;
private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
@@ -47,53 +47,6 @@ public final class SparkJavaITCase {
}
@Test(timeout = TIMEOUT)
- public void testSparkWordCount() throws Exception {
- final String inputFileName = "sample_input_wordcount_spark";
- final String outputFileName = "sample_output_wordcount_spark";
- final String testResourceFilename = "test_output_wordcount_spark";
- final String inputFilePath = fileBasePath + inputFileName;
- final String outputFilePath = fileBasePath + outputFileName;
-
- JobLauncher.main(builder
- .addJobId(JavaWordCount.class.getSimpleName() + "_test")
- .addUserMain(JavaWordCount.class.getCanonicalName())
- .addUserArgs(inputFilePath, outputFilePath)
- .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
- .build());
-
- try {
- ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, testResourceFilename);
- } finally {
- ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
- }
- }
-
- /* Temporary disabled because of Travis issue
- @Test(timeout = TIMEOUT)
- public void testSparkMapReduce() throws Exception {
- final String inputFileName = "sample_input_mr";
- final String outputFileName = "sample_output_mr";
- final String testResourceFileName = "test_output_mr";
- final String inputFilePath = fileBasePath + inputFileName;
- final String outputFilePath = fileBasePath + outputFileName;
- final String parallelism = "2";
- final String runOnYarn = "false";
-
- JobLauncher.main(builder
- .addJobId(JavaMapReduce.class.getSimpleName() + "_test")
- .addUserMain(JavaMapReduce.class.getCanonicalName())
- .addUserArgs(inputFilePath, outputFilePath, parallelism, runOnYarn)
- .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
- .build());
-
- try {
- ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, testResourceFileName);
- } finally {
- ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
- }
- }*/
-
- @Test(timeout = TIMEOUT)
public void testSparkPi() throws Exception {
final String numParallelism = "3";
diff --git a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScalaITCase.java b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScala.java
similarity index 98%
rename from examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScalaITCase.java
rename to examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScala.java
index 5541e58..2aaae7c 100644
--- a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScalaITCase.java
+++ b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScala.java
@@ -32,7 +32,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest(JobLauncher.class)
@PowerMockIgnore("javax.management.*")
-public final class SparkScalaITCase {
+public final class SparkScala {
private static final int TIMEOUT = 120000;
private static ArgBuilder builder;
private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
index 50211b3..3b7e44c 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
@@ -196,6 +196,7 @@ public final class NcsMessageEnvironment implements MessageEnvironment {
case BlockStateChanged:
case ExecutorFailed:
case DataSizeMetric:
+ case ExecutorDataCollected:
case MetricMessageReceived:
return MessageType.Send;
case RequestBlockLocation:
diff --git a/runtime/common/src/main/proto/ControlMessage.proto b/runtime/common/src/main/proto/ControlMessage.proto
index f6bd527..d662f66 100644
--- a/runtime/common/src/main/proto/ControlMessage.proto
+++ b/runtime/common/src/main/proto/ControlMessage.proto
@@ -32,13 +32,19 @@ message LaunchDAGMessage {
required string dag = 1;
}
+message DataCollectMessage {
+ required string data = 1;
+}
+
enum DriverToClientMessageType {
DriverStarted = 0;
ResourceReady = 1;
+ DataCollected = 2;
}
message DriverToClientMessage {
required DriverToClientMessageType type = 1;
+ optional DataCollectMessage dataCollected = 2;
}
enum MessageType {
@@ -50,12 +56,13 @@ enum MessageType {
BlockLocationInfo = 5;
ExecutorFailed = 6;
MetricMessageReceived = 7;
+ ExecutorDataCollected = 8;
}
message Message {
required MessageType type = 1;
required int64 id = 2;
- required string listenerId = 3; // The id of the message listner (handler).
+ required string listenerId = 3; // The id of the message listener (handler).
optional TaskStateChangedMsg taskStateChangedMsg = 4;
optional ScheduleTaskMsg scheduleTaskMsg = 5;
optional BlockStateChangedMsg blockStateChangedMsg = 6;
@@ -65,6 +72,7 @@ message Message {
optional ExecutorFailedMsg executorFailedMsg = 10;
optional ContainerFailedMsg containerFailedMsg = 11;
optional MetricMsg metricMsg = 12;
+ optional DataCollectMessage dataCollected = 13;
}
// Messages from Master to Executors
@@ -92,7 +100,7 @@ message TaskStateChangedMsg {
enum RecoverableFailureCause {
InputReadFailure = 0;
OutputWriteFailure = 1;
- // There is a 3rd cause: container_failure, but this is ommitted here as it is never propagated with a control msg.
+ // There is a 3rd cause: container_failure, but this is omitted here as it is never propagated with a control msg.
}
message BlockStateChangedMsg {
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
index 8102d64..d6c6ccc 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
@@ -20,6 +20,7 @@ import edu.snu.nemo.conf.JobConf;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.comm.ControlMessage;
import edu.snu.nemo.runtime.common.message.MessageParameters;
+import edu.snu.nemo.runtime.master.ClientRPC;
import edu.snu.nemo.runtime.master.RuntimeMaster;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.driver.client.JobMessageObserver;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index 69f5a17..f5a2e83 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -127,8 +127,8 @@ public final class Executor {
e.getPropertyValue(DecompressionProperty.class).orElse(null)));
});
- new TaskExecutor(
- task, irDag, taskStateManager, dataTransferFactory, metricMessageSender).execute();
+ new TaskExecutor(task, irDag, taskStateManager, dataTransferFactory,
+ metricMessageSender, persistentConnectionToMasterMap).execute();
} catch (final Exception e) {
persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
ControlMessage.Message.newBuilder()
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
index 87f09c0..fe20ade 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
@@ -23,6 +23,9 @@ import edu.snu.nemo.common.ir.Readable;
import edu.snu.nemo.common.ir.vertex.*;
import edu.snu.nemo.common.ir.vertex.transform.Transform;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.comm.ControlMessage;
+import edu.snu.nemo.runtime.common.message.MessageEnvironment;
+import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.common.plan.StageEdge;
import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
@@ -65,6 +68,8 @@ public final class TaskExecutor {
// Dynamic optimization
private String idOfVertexPutOnHold;
+ private final PersistentConnectionToMasterMap persistentConnectionToMasterMap;
+
/**
* Constructor.
* @param task Task with information needed during execution.
@@ -77,7 +82,8 @@ public final class TaskExecutor {
final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
final TaskStateManager taskStateManager,
final DataTransferFactory dataTransferFactory,
- final MetricMessageSender metricMessageSender) {
+ final MetricMessageSender metricMessageSender,
+ final PersistentConnectionToMasterMap persistentConnectionToMasterMap) {
// Essential information
this.isExecuted = false;
this.taskId = task.getTaskId();
@@ -91,6 +97,8 @@ public final class TaskExecutor {
// Assigning null is very bad, but we are keeping this for now
this.idOfVertexPutOnHold = null;
+ this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
+
// Prepare data structures
this.sideInputMap = new HashMap();
final Pair<List<DataFetcher>, List<VertexHarness>> pair = prepare(task, irVertexDag, dataTransferFactory);
@@ -407,6 +415,14 @@ public final class TaskExecutor {
Transform transform = ((OperatorVertex) irVertex).getTransform();
transform.close();
}
+ vertexHarness.getContext().getSerializedData().ifPresent(data ->
+ persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
+ ControlMessage.Message.newBuilder()
+ .setId(RuntimeIdGenerator.generateMessageId())
+ .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
+ .setType(ControlMessage.MessageType.ExecutorDataCollected)
+ .setDataCollected(ControlMessage.DataCollectMessage.newBuilder().setData(data).build())
+ .build()));
}
////////////////////////////////////////////// Misc
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index 73f531c..846bcca 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -43,6 +43,7 @@ import edu.snu.nemo.runtime.executor.Executor;
import edu.snu.nemo.runtime.executor.MetricManagerWorker;
import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
import edu.snu.nemo.runtime.executor.data.SerializerManager;
+import edu.snu.nemo.runtime.master.ClientRPC;
import edu.snu.nemo.runtime.master.MetricMessageHandler;
import edu.snu.nemo.runtime.master.BlockManagerMaster;
import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
@@ -90,7 +91,7 @@ import static org.mockito.Mockito.mock;
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class, MetricMessageHandler.class,
- SourceVertex.class})
+ SourceVertex.class, ClientRPC.class})
public final class DataTransferTest {
private static final String EXECUTOR_ID_PREFIX = "Executor";
private static final InterTaskDataStoreProperty.Value MEMORY_STORE = InterTaskDataStoreProperty.Value.MemoryStore;
@@ -136,11 +137,12 @@ public final class DataTransferTest {
final Scheduler scheduler = new BatchSingleJobScheduler(
schedulerRunner, taskQueue, master, pubSubEventHandler, updatePhysicalPlanEventHandler, executorRegistry);
final AtomicInteger executorCount = new AtomicInteger(0);
+ final ClientRPC clientRPC = mock(ClientRPC.class);
// Necessary for wiring up the message environments
final RuntimeMaster runtimeMaster =
new RuntimeMaster(scheduler, containerManager, master,
- metricMessageHandler, messageEnvironment, EMPTY_DAG_DIRECTORY);
+ metricMessageHandler, messageEnvironment, clientRPC, EMPTY_DAG_DIRECTORY);
final Injector injector1 = Tang.Factory.getTang().newInjector();
injector1.bindVolatileInstance(MessageEnvironment.class, messageEnvironment);
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
index 5e0a267..0c58744 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -28,6 +28,7 @@ import edu.snu.nemo.common.ir.vertex.transform.Transform;
import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.common.plan.StageEdge;
import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
@@ -65,7 +66,7 @@ import static org.mockito.Mockito.*;
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({InputReader.class, OutputWriter.class, DataTransferFactory.class,
- TaskStateManager.class, StageEdge.class})
+ TaskStateManager.class, StageEdge.class, PersistentConnectionToMasterMap.class})
public final class TaskExecutorTest {
private static final int DATA_SIZE = 100;
private static final ExecutionPropertyMap<VertexExecutionProperty> TASK_EXECUTION_PROPERTY_MAP
@@ -76,6 +77,7 @@ public final class TaskExecutorTest {
private DataTransferFactory dataTransferFactory;
private TaskStateManager taskStateManager;
private MetricMessageSender metricMessageSender;
+ private PersistentConnectionToMasterMap persistentConnectionToMasterMap;
private AtomicInteger stageId;
private String generateTaskId() {
@@ -101,6 +103,8 @@ public final class TaskExecutorTest {
metricMessageSender = mock(MetricMessageSender.class);
doNothing().when(metricMessageSender).send(anyString(), anyString());
doNothing().when(metricMessageSender).close();
+
+ persistentConnectionToMasterMap = mock(PersistentConnectionToMasterMap.class);
}
private boolean checkEqualElements(final List<Integer> left, final List<Integer> right) {
@@ -147,7 +151,7 @@ public final class TaskExecutorTest {
// Execute the task.
final TaskExecutor taskExecutor = new TaskExecutor(
- task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+ task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender, persistentConnectionToMasterMap);
taskExecutor.execute();
// Check the output.
@@ -177,7 +181,7 @@ public final class TaskExecutorTest {
// Execute the task.
final TaskExecutor taskExecutor = new TaskExecutor(
- task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+ task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender, persistentConnectionToMasterMap);
taskExecutor.execute();
// Check the output.
@@ -215,7 +219,7 @@ public final class TaskExecutorTest {
// Execute the task.
final TaskExecutor taskExecutor = new TaskExecutor(
- task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+ task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender, persistentConnectionToMasterMap);
taskExecutor.execute();
// Check the output.
@@ -247,7 +251,7 @@ public final class TaskExecutorTest {
// Execute the task.
final TaskExecutor taskExecutor = new TaskExecutor(
- task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+ task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender, persistentConnectionToMasterMap);
taskExecutor.execute();
// Check the output.
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/ClientRPC.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/ClientRPC.java
similarity index 99%
rename from runtime/driver/src/main/java/edu/snu/nemo/driver/ClientRPC.java
rename to runtime/master/src/main/java/edu/snu/nemo/runtime/master/ClientRPC.java
index 82698f0..2da083c 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/ClientRPC.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/ClientRPC.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.snu.nemo.driver;
+package edu.snu.nemo.runtime.master;
import com.google.protobuf.InvalidProtocolBufferException;
import edu.snu.nemo.conf.JobConf;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index a7df05a..4529a0b 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -78,6 +78,7 @@ public final class RuntimeMaster {
private final MetricMessageHandler metricMessageHandler;
private final MessageEnvironment masterMessageEnvironment;
private final Map<Integer, Long> aggregatedMetricData;
+ private final ClientRPC clientRPC;
// For converting json data. This is a thread safe.
private final ObjectMapper objectMapper;
@@ -94,6 +95,7 @@ public final class RuntimeMaster {
final BlockManagerMaster blockManagerMaster,
final MetricMessageHandler metricMessageHandler,
final MessageEnvironment masterMessageEnvironment,
+ final ClientRPC clientRPC,
@Parameter(JobConf.DAGDirectory.class) final String dagDirectory) {
// We would like to use a single thread for runtime master operations
// since the processing logic in master takes a very short amount of time
@@ -107,6 +109,7 @@ public final class RuntimeMaster {
this.masterMessageEnvironment = masterMessageEnvironment;
this.masterMessageEnvironment
.setupListener(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID, new MasterControlMessageReceiver());
+ this.clientRPC = clientRPC;
this.dagDirectory = dagDirectory;
this.irVertices = new HashSet<>();
this.resourceRequestCount = new AtomicInteger(0);
@@ -300,6 +303,13 @@ public final class RuntimeMaster {
metricList.forEach(metric ->
metricMessageHandler.onMetricMessageReceived(metric.getMetricKey(), metric.getMetricValue()));
break;
+ case ExecutorDataCollected:
+ final String serializedData = message.getDataCollected().getData();
+ clientRPC.send(ControlMessage.DriverToClientMessage.newBuilder()
+ .setType(ControlMessage.DriverToClientMessageType.DataCollected)
+ .setDataCollected(ControlMessage.DataCollectMessage.newBuilder().setData(serializedData).build())
+ .build());
+ break;
default:
throw new IllegalMessageException(
new Exception("This message should not be received by Master :" + message.getType()));
diff --git a/tests/src/test/java/edu/snu/nemo/client/ClientDriverRPCTest.java b/tests/src/test/java/edu/snu/nemo/client/ClientDriverRPCTest.java
index af94b59..6122de1 100644
--- a/tests/src/test/java/edu/snu/nemo/client/ClientDriverRPCTest.java
+++ b/tests/src/test/java/edu/snu/nemo/client/ClientDriverRPCTest.java
@@ -15,7 +15,7 @@
*/
package edu.snu.nemo.client;
-import edu.snu.nemo.driver.ClientRPC;
+import edu.snu.nemo.runtime.master.ClientRPC;
import edu.snu.nemo.runtime.common.comm.ControlMessage;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;