You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/06/28 07:04:22 UTC

[GitHub] seojangho closed pull request #56: [NEMO-16] Implement collection of data from executor to client

seojangho closed pull request #56: [NEMO-16] Implement collection of data from executor to client
URL: https://github.com/apache/incubator-nemo/pull/56
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
index 5c330d7c..b7cc03e7 100644
--- a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
+++ b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
@@ -47,7 +47,9 @@
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Base64;
+import java.util.List;
 
 /**
  * Job launcher.
@@ -60,6 +62,7 @@
   private static Configuration deployModeConf = null;
   private static Configuration builtJobConf = null;
   private static String serializedDAG;
+  private static List<?> collectedData = new ArrayList<>();
 
   /**
    * private constructor.
@@ -81,7 +84,10 @@ public static void main(final String[] args) throws Exception {
         .registerHandler(ControlMessage.DriverToClientMessageType.ResourceReady, event ->
           driverRPCServer.send(ControlMessage.ClientToDriverMessage.newBuilder()
               .setType(ControlMessage.ClientToDriverMessageType.LaunchDAG)
-              .setLaunchDAG(ControlMessage.LaunchDAGMessage.newBuilder().setDag(serializedDAG).build()).build()))
+              .setLaunchDAG(ControlMessage.LaunchDAGMessage.newBuilder().setDag(serializedDAG).build())
+              .build()))
+        .registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, message -> collectedData.addAll(
+            SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData()))))
         .run();
 
     // Get Job and Driver Confs
@@ -299,4 +305,13 @@ private static Configuration getExecutorResourceConf(final Configuration jobConf
   public static Configuration getBuiltJobConf() {
     return builtJobConf;
   }
+
+  /**
+   * Get the collected data.
+   *
+   * @return the collected data.
+   */
+  public static <T> List<T> getCollectedData() {
+    return (List<T>) collectedData;
+  }
 }
diff --git a/common/src/main/java/edu/snu/nemo/common/ContextImpl.java b/common/src/main/java/edu/snu/nemo/common/ContextImpl.java
index 56d1b4bf..823090ca 100644
--- a/common/src/main/java/edu/snu/nemo/common/ContextImpl.java
+++ b/common/src/main/java/edu/snu/nemo/common/ContextImpl.java
@@ -18,12 +18,14 @@
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
 
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Transform Context Implementation.
  */
 public final class ContextImpl implements Transform.Context {
   private final Map sideInputs;
+  private String data;
 
   /**
    * Constructor of Context Implementation.
@@ -31,10 +33,21 @@
    */
   public ContextImpl(final Map sideInputs) {
     this.sideInputs = sideInputs;
+    this.data = null;
   }
 
   @Override
   public Map getSideInputs() {
     return this.sideInputs;
   }
+
+  @Override
+  public void setSerializedData(final String serializedData) {
+    this.data = serializedData;
+  }
+
+  @Override
+  public Optional<String> getSerializedData() {
+    return Optional.ofNullable(this.data);
+  }
 }
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java
index 10d0bdee..448f76a5 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java
@@ -18,6 +18,7 @@
 import edu.snu.nemo.common.ir.OutputCollector;
 import java.io.Serializable;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Interface for specifying 'What' to do with data.
@@ -60,5 +61,17 @@ default Object getTag() {
      * @return sideInputs.
      */
     Map getSideInputs();
+
+    /**
+     * Put serialized data to send to the executor.
+     * @param serializedData the serialized data.
+     */
+    void setSerializedData(String serializedData);
+
+    /**
+     * Retrieve the serialized data on the executor.
+     * @return the serialized data.
+     */
+    Optional<String> getSerializedData();
   }
 }
diff --git a/common/src/test/java/edu/snu/nemo/common/ContextImplTest.java b/common/src/test/java/edu/snu/nemo/common/ContextImplTest.java
new file mode 100644
index 00000000..5a89d78a
--- /dev/null
+++ b/common/src/test/java/edu/snu/nemo/common/ContextImplTest.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.snu.nemo.common;
+
+import edu.snu.nemo.common.ir.vertex.transform.Transform;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests {@link ContextImpl}.
+ */
+public class ContextImplTest {
+  private Transform.Context context;
+  private final Map sideInputs = new HashMap();
+
+  @Before
+  public void setUp() {
+    sideInputs.put("a", "b");
+    this.context = new ContextImpl(sideInputs);
+  }
+
+  @Test
+  public void testContextImpl() {
+    assertEquals(this.sideInputs, this.context.getSideInputs());
+
+    final String sampleText = "sample_text";
+
+    assertFalse(this.context.getSerializedData().isPresent());
+
+    this.context.setSerializedData(sampleText);
+    assertTrue(this.context.getSerializedData().isPresent());
+    assertEquals(sampleText, this.context.getSerializedData().get());
+  }
+}
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
index 08731a48..33fedd1b 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
@@ -44,13 +44,7 @@
 import scala.collection.JavaConverters;
 import scala.collection.TraversableOnce;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.ObjectInputStream;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Stack;
+import java.util.*;
 
 /**
  * Utility class for RDDs.
@@ -95,11 +89,7 @@ public static Serializer deriveSerializerFrom(final org.apache.spark.SparkContex
                                     final Serializer serializer) {
     final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>(dag);
 
-    // save result in a temporary file
-    // TODO #16: Implement collection of data from executor to client
-    final String resultFile = System.getProperty("user.dir") + "/collectresult";
-
-    final IRVertex collectVertex = new OperatorVertex(new CollectTransform<>(resultFile));
+    final IRVertex collectVertex = new OperatorVertex(new CollectTransform<>());
     builder.addVertex(collectVertex, loopVertexStack);
 
     final IREdge newEdge = new IREdge(getEdgeCommunicationPattern(lastVertex, collectVertex),
@@ -112,34 +102,7 @@ public static Serializer deriveSerializerFrom(final org.apache.spark.SparkContex
     // launch DAG
     JobLauncher.launchDAG(builder.build());
 
-    // Retrieve result data from file.
-    // TODO #16: Implement collection of data from executor to client
-    try {
-      final List<T> result = new ArrayList<>();
-      Integer i = 0;
-
-      // TODO #16: Implement collection of data from executor to client
-      File file = new File(resultFile + i);
-      while (file.exists()) {
-        try (
-            final FileInputStream fis = new FileInputStream(file);
-            final ObjectInputStream dis = new ObjectInputStream(fis)
-        ) {
-          final int size = dis.readInt(); // Read the number of collected T recorded in CollectTransform.
-          for (int j = 0; j < size; j++) {
-            result.add((T) dis.readObject());
-          }
-        }
-
-        // Delete temporary file
-        if (file.delete()) {
-          file = new File(resultFile + ++i);
-        }
-      }
-      return result;
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
+    return (List<T>) JobLauncher.getCollectedData();
   }
 
   /**
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java
index c1df3ba6..2cbeb0a6 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java
@@ -17,56 +17,38 @@
 
 import edu.snu.nemo.common.ir.OutputCollector;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
+import org.apache.commons.lang3.SerializationUtils;
 
-import java.io.FileOutputStream;
-import java.io.ObjectOutputStream;
 import java.util.ArrayList;
-import java.util.List;
+import java.util.Base64;
 
 /**
  * Collect transform.
  * @param <T> type of data to collect.
  */
 public final class CollectTransform<T> implements Transform<T, T> {
-  private String filename;
-  private final List<T> list;
+  private final ArrayList<T> list;
+  private Context ctxt;
 
   /**
    * Constructor.
-   *
-   * @param filename file to keep the result in.
    */
-  public CollectTransform(final String filename) {
-    this.filename = filename;
+  public CollectTransform() {
     this.list = new ArrayList<>();
   }
 
   @Override
   public void prepare(final Context context, final OutputCollector<T> oc) {
-    this.filename = filename + JavaRDD.getResultId();
+    this.ctxt = context;
   }
 
   @Override
   public void onData(final T element) {
-    // Write result to a temporary file.
-    // TODO #16: Implement collection of data from executor to client
     list.add(element);
   }
 
   @Override
   public void close() {
-    try (
-        final FileOutputStream fos = new FileOutputStream(filename);
-        final ObjectOutputStream oos = new ObjectOutputStream(fos)
-    ) {
-      // Write the length of list at first. This is needed internally and must not shown in the collected result.
-      oos.writeInt(list.size());
-      for (final T t : list) {
-        oos.writeObject(t);
-      }
-    } catch (Exception e) {
-      throw new RuntimeException("Exception while file closing in CollectTransform " + e);
-    }
+    ctxt.setSerializedData(Base64.getEncoder().encodeToString(SerializationUtils.serialize(list)));
   }
 }
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceTransform.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceTransform.java
index c7dace1c..e880b726 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceTransform.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceTransform.java
@@ -15,16 +15,11 @@
  */
 package edu.snu.nemo.compiler.frontend.spark.transform;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Output;
 import edu.snu.nemo.common.ir.OutputCollector;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
 import org.apache.spark.api.java.function.Function2;
 
 import javax.annotation.Nullable;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.util.Iterator;
 
 /**
@@ -36,7 +31,6 @@
   private final Function2<T, T, T> func;
   private OutputCollector<T> outputCollector;
   private T result;
-  private String filename;
 
   /**
    * Constructor.
@@ -45,7 +39,6 @@
   public ReduceTransform(final Function2<T, T, T> func) {
     this.func = func;
     this.result = null;
-    this.filename = filename + JavaRDD.getResultId();
   }
 
   @Override
@@ -98,15 +91,5 @@ public void onData(final T element) {
 
   @Override
   public void close() {
-    // Write result to a temporary file.
-    // TODO #16: Implement collection of data from executor to client.
-    try {
-      final Kryo kryo = new Kryo();
-      final Output output = new Output(new FileOutputStream(filename));
-      kryo.writeClassAndObject(output, result);
-      output.close();
-    } catch (FileNotFoundException e) {
-      throw new RuntimeException(e);
-    }
   }
 }
diff --git a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
new file mode 100644
index 00000000..c4d178f4
--- /dev/null
+++ b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.snu.nemo.examples.spark;
+
+import edu.snu.nemo.client.JobLauncher;
+import edu.snu.nemo.common.test.ArgBuilder;
+import edu.snu.nemo.common.test.ExampleTestUtil;
+import edu.snu.nemo.compiler.optimizer.policy.DefaultPolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test MR Spark programs with JobLauncher.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+@PowerMockIgnore("javax.management.*")
+public final class MRJava {
+  private static final int TIMEOUT = 180000;
+  private static ArgBuilder builder;
+  private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
+  private static final String executorResourceFileName = fileBasePath + "spark_sample_executor_resources.json";
+
+  @Before
+  public void setUp() {
+    builder = new ArgBuilder()
+        .addResourceJson(executorResourceFileName);
+  }
+
+  @Test(timeout = TIMEOUT)
+  public void testSparkWordCount() throws Exception {
+    final String inputFileName = "sample_input_wordcount_spark";
+    final String outputFileName = "sample_output_wordcount_spark";
+    final String testResourceFilename = "test_output_wordcount_spark";
+    final String inputFilePath = fileBasePath + inputFileName;
+    final String outputFilePath = fileBasePath + outputFileName;
+
+    JobLauncher.main(builder
+        .addJobId(JavaWordCount.class.getSimpleName() + "_test")
+        .addUserMain(JavaWordCount.class.getCanonicalName())
+        .addUserArgs(inputFilePath, outputFilePath)
+        .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
+        .build());
+
+    try {
+      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, testResourceFilename);
+    } finally {
+      ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+    }
+  }
+
+  /* Temporary disabled because of Travis issue
+  @Test(timeout = TIMEOUT)
+  public void testSparkMapReduce() throws Exception {
+    final String inputFileName = "sample_input_wordcount_spark";
+    final String outputFileName = "sample_output_mr";
+    final String testResourceFilename = "test_output_wordcount_spark";
+    final String inputFilePath = fileBasePath + inputFileName;
+    final String outputFilePath = fileBasePath + outputFileName;
+    final String parallelism = "2";
+    final String runOnYarn = "false";
+
+    JobLauncher.main(builder
+        .addJobId(JavaMapReduce.class.getSimpleName() + "_test")
+        .addUserMain(JavaMapReduce.class.getCanonicalName())
+        .addUserArgs(inputFilePath, outputFilePath, parallelism, runOnYarn)
+        .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
+        .build());
+
+    try {
+      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, testResourceFilename);
+    } finally {
+      ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+    }
+  }*/
+}
diff --git a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkJavaITCase.java b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkJava.java
similarity index 67%
rename from examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkJavaITCase.java
rename to examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkJava.java
index 8aa1d48a..132e5170 100644
--- a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkJavaITCase.java
+++ b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkJava.java
@@ -34,7 +34,7 @@
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
 @PowerMockIgnore("javax.management.*")
-public final class SparkJavaITCase {
+public final class SparkJava {
   private static final int TIMEOUT = 180000;
   private static ArgBuilder builder;
   private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
@@ -46,53 +46,6 @@ public void setUp() {
         .addResourceJson(executorResourceFileName);
   }
 
-  @Test(timeout = TIMEOUT)
-  public void testSparkWordCount() throws Exception {
-    final String inputFileName = "sample_input_wordcount_spark";
-    final String outputFileName = "sample_output_wordcount_spark";
-    final String testResourceFilename = "test_output_wordcount_spark";
-    final String inputFilePath = fileBasePath + inputFileName;
-    final String outputFilePath = fileBasePath + outputFileName;
-
-    JobLauncher.main(builder
-        .addJobId(JavaWordCount.class.getSimpleName() + "_test")
-        .addUserMain(JavaWordCount.class.getCanonicalName())
-        .addUserArgs(inputFilePath, outputFilePath)
-        .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
-        .build());
-
-    try {
-      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, testResourceFilename);
-    } finally {
-      ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
-    }
-  }
-
-  /* Temporary disabled because of Travis issue
-  @Test(timeout = TIMEOUT)
-  public void testSparkMapReduce() throws Exception {
-    final String inputFileName = "sample_input_mr";
-    final String outputFileName = "sample_output_mr";
-    final String testResourceFileName = "test_output_mr";
-    final String inputFilePath = fileBasePath + inputFileName;
-    final String outputFilePath = fileBasePath + outputFileName;
-    final String parallelism = "2";
-    final String runOnYarn = "false";
-
-    JobLauncher.main(builder
-        .addJobId(JavaMapReduce.class.getSimpleName() + "_test")
-        .addUserMain(JavaMapReduce.class.getCanonicalName())
-        .addUserArgs(inputFilePath, outputFilePath, parallelism, runOnYarn)
-        .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
-        .build());
-
-    try {
-      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, testResourceFileName);
-    } finally {
-      ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
-    }
-  }*/
-
   @Test(timeout = TIMEOUT)
   public void testSparkPi() throws Exception {
     final String numParallelism = "3";
diff --git a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScalaITCase.java b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScala.java
similarity index 98%
rename from examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScalaITCase.java
rename to examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScala.java
index 5541e583..2aaae7c2 100644
--- a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScalaITCase.java
+++ b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScala.java
@@ -32,7 +32,7 @@
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
 @PowerMockIgnore("javax.management.*")
-public final class SparkScalaITCase {
+public final class SparkScala {
   private static final int TIMEOUT = 120000;
   private static ArgBuilder builder;
   private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
index 50211b32..3b7e44c1 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
@@ -196,6 +196,7 @@ private MessageType getMsgType(final ControlMessage.Message controlMessage) {
       case BlockStateChanged:
       case ExecutorFailed:
       case DataSizeMetric:
+      case ExecutorDataCollected:
       case MetricMessageReceived:
         return MessageType.Send;
       case RequestBlockLocation:
diff --git a/runtime/common/src/main/proto/ControlMessage.proto b/runtime/common/src/main/proto/ControlMessage.proto
index f6bd527e..d662f661 100644
--- a/runtime/common/src/main/proto/ControlMessage.proto
+++ b/runtime/common/src/main/proto/ControlMessage.proto
@@ -32,13 +32,19 @@ message LaunchDAGMessage {
     required string dag = 1;
 }
 
+message DataCollectMessage {
+    required string data = 1;
+}
+
 enum DriverToClientMessageType {
     DriverStarted = 0;
     ResourceReady = 1;
+    DataCollected = 2;
 }
 
 message DriverToClientMessage {
     required DriverToClientMessageType type = 1;
+    optional DataCollectMessage dataCollected = 2;
 }
 
 enum MessageType {
@@ -50,12 +56,13 @@ enum MessageType {
     BlockLocationInfo = 5;
     ExecutorFailed = 6;
     MetricMessageReceived = 7;
+    ExecutorDataCollected = 8;
 }
 
 message Message {
     required MessageType type = 1;
     required int64 id = 2;
-    required string listenerId = 3; // The id of the message listner (handler).
+    required string listenerId = 3; // The id of the message listener (handler).
     optional TaskStateChangedMsg taskStateChangedMsg = 4;
     optional ScheduleTaskMsg scheduleTaskMsg = 5;
     optional BlockStateChangedMsg blockStateChangedMsg = 6;
@@ -65,6 +72,7 @@ message Message {
     optional ExecutorFailedMsg executorFailedMsg = 10;
     optional ContainerFailedMsg containerFailedMsg = 11;
     optional MetricMsg metricMsg = 12;
+    optional DataCollectMessage dataCollected = 13;
 }
 
 // Messages from Master to Executors
@@ -92,7 +100,7 @@ message TaskStateChangedMsg {
 enum RecoverableFailureCause {
     InputReadFailure = 0;
     OutputWriteFailure = 1;
-    // There is a 3rd cause: container_failure, but this is ommitted here as it is never propagated with a control msg.
+    // There is a 3rd cause: container_failure, but this is omitted here as it is never propagated with a control msg.
 }
 
 message BlockStateChangedMsg {
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
index 8102d645..d6c6ccc8 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
@@ -20,6 +20,7 @@
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageParameters;
+import edu.snu.nemo.runtime.master.ClientRPC;
 import edu.snu.nemo.runtime.master.RuntimeMaster;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.driver.client.JobMessageObserver;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index 69f5a173..f5a2e835 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -127,8 +127,8 @@ private void launchTask(final Task task) {
             e.getPropertyValue(DecompressionProperty.class).orElse(null)));
       });
 
-      new TaskExecutor(
-          task, irDag, taskStateManager, dataTransferFactory, metricMessageSender).execute();
+      new TaskExecutor(task, irDag, taskStateManager, dataTransferFactory,
+          metricMessageSender, persistentConnectionToMasterMap).execute();
     } catch (final Exception e) {
       persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
           ControlMessage.Message.newBuilder()
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
index 87f09c09..fe20ade9 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
@@ -23,6 +23,9 @@
 import edu.snu.nemo.common.ir.vertex.*;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.comm.ControlMessage;
+import edu.snu.nemo.runtime.common.message.MessageEnvironment;
+import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
@@ -65,6 +68,8 @@
   // Dynamic optimization
   private String idOfVertexPutOnHold;
 
+  private final PersistentConnectionToMasterMap persistentConnectionToMasterMap;
+
   /**
    * Constructor.
    * @param task Task with information needed during execution.
@@ -77,7 +82,8 @@ public TaskExecutor(final Task task,
                       final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
                       final TaskStateManager taskStateManager,
                       final DataTransferFactory dataTransferFactory,
-                      final MetricMessageSender metricMessageSender) {
+                      final MetricMessageSender metricMessageSender,
+                      final PersistentConnectionToMasterMap persistentConnectionToMasterMap) {
     // Essential information
     this.isExecuted = false;
     this.taskId = task.getTaskId();
@@ -91,6 +97,8 @@ public TaskExecutor(final Task task,
     // Assigning null is very bad, but we are keeping this for now
     this.idOfVertexPutOnHold = null;
 
+    this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
+
     // Prepare data structures
     this.sideInputMap = new HashMap();
     final Pair<List<DataFetcher>, List<VertexHarness>> pair = prepare(task, irVertexDag, dataTransferFactory);
@@ -407,6 +415,14 @@ private void closeTransform(final VertexHarness vertexHarness) {
       Transform transform = ((OperatorVertex) irVertex).getTransform();
       transform.close();
     }
+    vertexHarness.getContext().getSerializedData().ifPresent(data ->
+        persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
+            ControlMessage.Message.newBuilder()
+                .setId(RuntimeIdGenerator.generateMessageId())
+                .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
+                .setType(ControlMessage.MessageType.ExecutorDataCollected)
+                .setDataCollected(ControlMessage.DataCollectMessage.newBuilder().setData(data).build())
+                .build()));
   }
 
   ////////////////////////////////////////////// Misc
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index 73f531c4..846bccab 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -43,6 +43,7 @@
 import edu.snu.nemo.runtime.executor.MetricManagerWorker;
 import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
 import edu.snu.nemo.runtime.executor.data.SerializerManager;
+import edu.snu.nemo.runtime.master.ClientRPC;
 import edu.snu.nemo.runtime.master.MetricMessageHandler;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
@@ -90,7 +91,7 @@
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class, MetricMessageHandler.class,
-    SourceVertex.class})
+    SourceVertex.class, ClientRPC.class})
 public final class DataTransferTest {
   private static final String EXECUTOR_ID_PREFIX = "Executor";
   private static final InterTaskDataStoreProperty.Value MEMORY_STORE = InterTaskDataStoreProperty.Value.MemoryStore;
@@ -136,11 +137,12 @@ public void setUp() throws InjectionException {
     final Scheduler scheduler = new BatchSingleJobScheduler(
         schedulerRunner, taskQueue, master, pubSubEventHandler, updatePhysicalPlanEventHandler, executorRegistry);
     final AtomicInteger executorCount = new AtomicInteger(0);
+    final ClientRPC clientRPC = mock(ClientRPC.class);
 
     // Necessary for wiring up the message environments
     final RuntimeMaster runtimeMaster =
         new RuntimeMaster(scheduler, containerManager, master,
-            metricMessageHandler, messageEnvironment, EMPTY_DAG_DIRECTORY);
+            metricMessageHandler, messageEnvironment, clientRPC, EMPTY_DAG_DIRECTORY);
 
     final Injector injector1 = Tang.Factory.getTang().newInjector();
     injector1.bindVolatileInstance(MessageEnvironment.class, messageEnvironment);
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
index 5e0a267d..0c587445 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -28,6 +28,7 @@
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
@@ -65,7 +66,7 @@
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({InputReader.class, OutputWriter.class, DataTransferFactory.class,
-    TaskStateManager.class, StageEdge.class})
+    TaskStateManager.class, StageEdge.class, PersistentConnectionToMasterMap.class})
 public final class TaskExecutorTest {
   private static final int DATA_SIZE = 100;
   private static final ExecutionPropertyMap<VertexExecutionProperty> TASK_EXECUTION_PROPERTY_MAP
@@ -76,6 +77,7 @@
   private DataTransferFactory dataTransferFactory;
   private TaskStateManager taskStateManager;
   private MetricMessageSender metricMessageSender;
+  private PersistentConnectionToMasterMap persistentConnectionToMasterMap;
   private AtomicInteger stageId;
 
   private String generateTaskId() {
@@ -101,6 +103,8 @@ public void setUp() throws Exception {
     metricMessageSender = mock(MetricMessageSender.class);
     doNothing().when(metricMessageSender).send(anyString(), anyString());
     doNothing().when(metricMessageSender).close();
+
+    persistentConnectionToMasterMap = mock(PersistentConnectionToMasterMap.class);
   }
 
   private boolean checkEqualElements(final List<Integer> left, final List<Integer> right) {
@@ -147,7 +151,7 @@ public Iterable read() throws IOException {
 
     // Execute the task.
     final TaskExecutor taskExecutor = new TaskExecutor(
-        task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+        task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender, persistentConnectionToMasterMap);
     taskExecutor.execute();
 
     // Check the output.
@@ -177,7 +181,7 @@ public void testParentTaskDataFetching() throws Exception {
 
     // Execute the task.
     final TaskExecutor taskExecutor = new TaskExecutor(
-        task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+        task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender, persistentConnectionToMasterMap);
     taskExecutor.execute();
 
     // Check the output.
@@ -215,7 +219,7 @@ public void testTwoOperators() throws Exception {
 
     // Execute the task.
     final TaskExecutor taskExecutor = new TaskExecutor(
-        task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+        task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender, persistentConnectionToMasterMap);
     taskExecutor.execute();
 
     // Check the output.
@@ -247,7 +251,7 @@ public void testTwoOperatorsWithSideInput() throws Exception {
 
     // Execute the task.
     final TaskExecutor taskExecutor = new TaskExecutor(
-        task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+        task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender, persistentConnectionToMasterMap);
     taskExecutor.execute();
 
     // Check the output.
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/ClientRPC.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/ClientRPC.java
similarity index 99%
rename from runtime/driver/src/main/java/edu/snu/nemo/driver/ClientRPC.java
rename to runtime/master/src/main/java/edu/snu/nemo/runtime/master/ClientRPC.java
index 82698f03..2da083c1 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/ClientRPC.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/ClientRPC.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.driver;
+package edu.snu.nemo.runtime.master;
 
 import com.google.protobuf.InvalidProtocolBufferException;
 import edu.snu.nemo.conf.JobConf;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index a7df05a0..4529a0b2 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -78,6 +78,7 @@
   private final MetricMessageHandler metricMessageHandler;
   private final MessageEnvironment masterMessageEnvironment;
   private final Map<Integer, Long> aggregatedMetricData;
+  private final ClientRPC clientRPC;
 
   // For converting json data. This is a thread safe.
   private final ObjectMapper objectMapper;
@@ -94,6 +95,7 @@ public RuntimeMaster(final Scheduler scheduler,
                        final BlockManagerMaster blockManagerMaster,
                        final MetricMessageHandler metricMessageHandler,
                        final MessageEnvironment masterMessageEnvironment,
+                       final ClientRPC clientRPC,
                        @Parameter(JobConf.DAGDirectory.class) final String dagDirectory) {
     // We would like to use a single thread for runtime master operations
     // since the processing logic in master takes a very short amount of time
@@ -107,6 +109,7 @@ public RuntimeMaster(final Scheduler scheduler,
     this.masterMessageEnvironment = masterMessageEnvironment;
     this.masterMessageEnvironment
         .setupListener(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID, new MasterControlMessageReceiver());
+    this.clientRPC = clientRPC;
     this.dagDirectory = dagDirectory;
     this.irVertices = new HashSet<>();
     this.resourceRequestCount = new AtomicInteger(0);
@@ -300,6 +303,13 @@ private void handleControlMessage(final ControlMessage.Message message) {
         metricList.forEach(metric ->
             metricMessageHandler.onMetricMessageReceived(metric.getMetricKey(), metric.getMetricValue()));
         break;
+      case ExecutorDataCollected:
+        final String serializedData = message.getDataCollected().getData();
+        clientRPC.send(ControlMessage.DriverToClientMessage.newBuilder()
+            .setType(ControlMessage.DriverToClientMessageType.DataCollected)
+            .setDataCollected(ControlMessage.DataCollectMessage.newBuilder().setData(serializedData).build())
+            .build());
+        break;
       default:
         throw new IllegalMessageException(
             new Exception("This message should not be received by Master :" + message.getType()));
diff --git a/tests/src/test/java/edu/snu/nemo/client/ClientDriverRPCTest.java b/tests/src/test/java/edu/snu/nemo/client/ClientDriverRPCTest.java
index af94b597..6122de1c 100644
--- a/tests/src/test/java/edu/snu/nemo/client/ClientDriverRPCTest.java
+++ b/tests/src/test/java/edu/snu/nemo/client/ClientDriverRPCTest.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.client;
 
-import edu.snu.nemo.driver.ClientRPC;
+import edu.snu.nemo.runtime.master.ClientRPC;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services