You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ja...@apache.org on 2018/06/28 07:04:23 UTC

[incubator-nemo] branch master updated: [NEMO-16] Implement collection of data from executor to client (#56)

This is an automated email from the ASF dual-hosted git repository.

jangho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 3493fba  [NEMO-16] Implement collection of data from executor to client (#56)
3493fba is described below

commit 3493fba3330eec76ddaac19e61a67de4258d3fc5
Author: Won Wook SONG <wo...@apache.org>
AuthorDate: Thu Jun 28 16:04:20 2018 +0900

    [NEMO-16] Implement collection of data from executor to client (#56)
    
    JIRA: [NEMO-16: Implement collection of data from executor to client](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-16)
    
    **Major changes:**
    - Gets rid of the file-writing and reading during the `collect` action of Spark. Instead, it brings the data from the executor through the driver, and then back to the client using the RPC implemented with [NEMO-103] (#45).
    
    **Minor changes to note:**
    - I've separated the MR Spark integration tests into a separate file, as the SparkJava integration tests file has been becoming too large and crowded.
    - A number of typos have been fixed
    
    **Tests for the changes:**
    - I've created a test for the `ContextImpl`, which had no unit tests before.
    
    **Other comments:**
    - N/A
    
    resolves [NEMO-16](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-16)
---
 .../main/java/edu/snu/nemo/client/JobLauncher.java | 17 ++++++-
 .../main/java/edu/snu/nemo/common/ContextImpl.java | 13 +++++
 .../nemo/common/ir/vertex/transform/Transform.java | 13 +++++
 .../java/edu/snu/nemo/common/ContextImplTest.java  | 55 ++++++++++++++++++++++
 .../frontend/spark/core/SparkFrontendUtils.java    | 43 ++---------------
 .../frontend/spark/transform/CollectTransform.java | 32 +++----------
 .../frontend/spark/transform/ReduceTransform.java  | 17 -------
 .../spark/{SparkScalaITCase.java => MRJava.java}   | 42 +++++++++++------
 .../spark/{SparkJavaITCase.java => SparkJava.java} | 49 +------------------
 .../{SparkScalaITCase.java => SparkScala.java}     |  2 +-
 .../common/message/ncs/NcsMessageEnvironment.java  |  1 +
 runtime/common/src/main/proto/ControlMessage.proto | 12 ++++-
 .../main/java/edu/snu/nemo/driver/NemoDriver.java  |  1 +
 .../edu/snu/nemo/runtime/executor/Executor.java    |  4 +-
 .../nemo/runtime/executor/task/TaskExecutor.java   | 18 ++++++-
 .../executor/datatransfer/DataTransferTest.java    |  6 ++-
 .../runtime/executor/task/TaskExecutorTest.java    | 14 ++++--
 .../edu/snu/nemo/runtime/master}/ClientRPC.java    |  2 +-
 .../edu/snu/nemo/runtime/master/RuntimeMaster.java | 10 ++++
 .../edu/snu/nemo/client/ClientDriverRPCTest.java   |  2 +-
 20 files changed, 193 insertions(+), 160 deletions(-)

diff --git a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
index 5c330d7..b7cc03e 100644
--- a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
+++ b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
@@ -47,7 +47,9 @@ import java.lang.reflect.Modifier;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Base64;
+import java.util.List;
 
 /**
  * Job launcher.
@@ -60,6 +62,7 @@ public final class JobLauncher {
   private static Configuration deployModeConf = null;
   private static Configuration builtJobConf = null;
   private static String serializedDAG;
+  private static List<?> collectedData = new ArrayList<>();
 
   /**
    * private constructor.
@@ -81,7 +84,10 @@ public final class JobLauncher {
         .registerHandler(ControlMessage.DriverToClientMessageType.ResourceReady, event ->
           driverRPCServer.send(ControlMessage.ClientToDriverMessage.newBuilder()
               .setType(ControlMessage.ClientToDriverMessageType.LaunchDAG)
-              .setLaunchDAG(ControlMessage.LaunchDAGMessage.newBuilder().setDag(serializedDAG).build()).build()))
+              .setLaunchDAG(ControlMessage.LaunchDAGMessage.newBuilder().setDag(serializedDAG).build())
+              .build()))
+        .registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, message -> collectedData.addAll(
+            SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData()))))
         .run();
 
     // Get Job and Driver Confs
@@ -299,4 +305,13 @@ public final class JobLauncher {
   public static Configuration getBuiltJobConf() {
     return builtJobConf;
   }
+
+  /**
+   * Get the collected data.
+   *
+   * @return the collected data.
+   */
+  public static <T> List<T> getCollectedData() {
+    return (List<T>) collectedData;
+  }
 }
diff --git a/common/src/main/java/edu/snu/nemo/common/ContextImpl.java b/common/src/main/java/edu/snu/nemo/common/ContextImpl.java
index 56d1b4b..823090c 100644
--- a/common/src/main/java/edu/snu/nemo/common/ContextImpl.java
+++ b/common/src/main/java/edu/snu/nemo/common/ContextImpl.java
@@ -18,12 +18,14 @@ package edu.snu.nemo.common;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
 
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Transform Context Implementation.
  */
 public final class ContextImpl implements Transform.Context {
   private final Map sideInputs;
+  private String data;
 
   /**
    * Constructor of Context Implementation.
@@ -31,10 +33,21 @@ public final class ContextImpl implements Transform.Context {
    */
   public ContextImpl(final Map sideInputs) {
     this.sideInputs = sideInputs;
+    this.data = null;
   }
 
   @Override
   public Map getSideInputs() {
     return this.sideInputs;
   }
+
+  @Override
+  public void setSerializedData(final String serializedData) {
+    this.data = serializedData;
+  }
+
+  @Override
+  public Optional<String> getSerializedData() {
+    return Optional.ofNullable(this.data);
+  }
 }
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java
index 10d0bde..448f76a 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java
@@ -18,6 +18,7 @@ package edu.snu.nemo.common.ir.vertex.transform;
 import edu.snu.nemo.common.ir.OutputCollector;
 import java.io.Serializable;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Interface for specifying 'What' to do with data.
@@ -60,5 +61,17 @@ public interface Transform<I, O> extends Serializable {
      * @return sideInputs.
      */
     Map getSideInputs();
+
+    /**
+     * Put serialized data to send to the executor.
+     * @param serializedData the serialized data.
+     */
+    void setSerializedData(String serializedData);
+
+    /**
+     * Retrieve the serialized data on the executor.
+     * @return the serialized data.
+     */
+    Optional<String> getSerializedData();
   }
 }
diff --git a/common/src/test/java/edu/snu/nemo/common/ContextImplTest.java b/common/src/test/java/edu/snu/nemo/common/ContextImplTest.java
new file mode 100644
index 0000000..5a89d78
--- /dev/null
+++ b/common/src/test/java/edu/snu/nemo/common/ContextImplTest.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.snu.nemo.common;
+
+import edu.snu.nemo.common.ir.vertex.transform.Transform;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests {@link ContextImpl}.
+ */
+public class ContextImplTest {
+  private Transform.Context context;
+  private final Map sideInputs = new HashMap();
+
+  @Before
+  public void setUp() {
+    sideInputs.put("a", "b");
+    this.context = new ContextImpl(sideInputs);
+  }
+
+  @Test
+  public void testContextImpl() {
+    assertEquals(this.sideInputs, this.context.getSideInputs());
+
+    final String sampleText = "sample_text";
+
+    assertFalse(this.context.getSerializedData().isPresent());
+
+    this.context.setSerializedData(sampleText);
+    assertTrue(this.context.getSerializedData().isPresent());
+    assertEquals(sampleText, this.context.getSerializedData().get());
+  }
+}
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
index 08731a4..33fedd1 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
@@ -44,13 +44,7 @@ import scala.Tuple2;
 import scala.collection.JavaConverters;
 import scala.collection.TraversableOnce;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.ObjectInputStream;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Stack;
+import java.util.*;
 
 /**
  * Utility class for RDDs.
@@ -95,11 +89,7 @@ public final class SparkFrontendUtils {
                                     final Serializer serializer) {
     final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>(dag);
 
-    // save result in a temporary file
-    // TODO #16: Implement collection of data from executor to client
-    final String resultFile = System.getProperty("user.dir") + "/collectresult";
-
-    final IRVertex collectVertex = new OperatorVertex(new CollectTransform<>(resultFile));
+    final IRVertex collectVertex = new OperatorVertex(new CollectTransform<>());
     builder.addVertex(collectVertex, loopVertexStack);
 
     final IREdge newEdge = new IREdge(getEdgeCommunicationPattern(lastVertex, collectVertex),
@@ -112,34 +102,7 @@ public final class SparkFrontendUtils {
     // launch DAG
     JobLauncher.launchDAG(builder.build());
 
-    // Retrieve result data from file.
-    // TODO #16: Implement collection of data from executor to client
-    try {
-      final List<T> result = new ArrayList<>();
-      Integer i = 0;
-
-      // TODO #16: Implement collection of data from executor to client
-      File file = new File(resultFile + i);
-      while (file.exists()) {
-        try (
-            final FileInputStream fis = new FileInputStream(file);
-            final ObjectInputStream dis = new ObjectInputStream(fis)
-        ) {
-          final int size = dis.readInt(); // Read the number of collected T recorded in CollectTransform.
-          for (int j = 0; j < size; j++) {
-            result.add((T) dis.readObject());
-          }
-        }
-
-        // Delete temporary file
-        if (file.delete()) {
-          file = new File(resultFile + ++i);
-        }
-      }
-      return result;
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
+    return (List<T>) JobLauncher.getCollectedData();
   }
 
   /**
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java
index c1df3ba..2cbeb0a 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java
@@ -17,56 +17,38 @@ package edu.snu.nemo.compiler.frontend.spark.transform;
 
 import edu.snu.nemo.common.ir.OutputCollector;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
+import org.apache.commons.lang3.SerializationUtils;
 
-import java.io.FileOutputStream;
-import java.io.ObjectOutputStream;
 import java.util.ArrayList;
-import java.util.List;
+import java.util.Base64;
 
 /**
  * Collect transform.
  * @param <T> type of data to collect.
  */
 public final class CollectTransform<T> implements Transform<T, T> {
-  private String filename;
-  private final List<T> list;
+  private final ArrayList<T> list;
+  private Context ctxt;
 
   /**
    * Constructor.
-   *
-   * @param filename file to keep the result in.
    */
-  public CollectTransform(final String filename) {
-    this.filename = filename;
+  public CollectTransform() {
     this.list = new ArrayList<>();
   }
 
   @Override
   public void prepare(final Context context, final OutputCollector<T> oc) {
-    this.filename = filename + JavaRDD.getResultId();
+    this.ctxt = context;
   }
 
   @Override
   public void onData(final T element) {
-    // Write result to a temporary file.
-    // TODO #16: Implement collection of data from executor to client
     list.add(element);
   }
 
   @Override
   public void close() {
-    try (
-        final FileOutputStream fos = new FileOutputStream(filename);
-        final ObjectOutputStream oos = new ObjectOutputStream(fos)
-    ) {
-      // Write the length of list at first. This is needed internally and must not shown in the collected result.
-      oos.writeInt(list.size());
-      for (final T t : list) {
-        oos.writeObject(t);
-      }
-    } catch (Exception e) {
-      throw new RuntimeException("Exception while file closing in CollectTransform " + e);
-    }
+    ctxt.setSerializedData(Base64.getEncoder().encodeToString(SerializationUtils.serialize(list)));
   }
 }
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceTransform.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceTransform.java
index c7dace1..e880b72 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceTransform.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceTransform.java
@@ -15,16 +15,11 @@
  */
 package edu.snu.nemo.compiler.frontend.spark.transform;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Output;
 import edu.snu.nemo.common.ir.OutputCollector;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
 import org.apache.spark.api.java.function.Function2;
 
 import javax.annotation.Nullable;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.util.Iterator;
 
 /**
@@ -36,7 +31,6 @@ public final class ReduceTransform<T> implements Transform<T, T> {
   private final Function2<T, T, T> func;
   private OutputCollector<T> outputCollector;
   private T result;
-  private String filename;
 
   /**
    * Constructor.
@@ -45,7 +39,6 @@ public final class ReduceTransform<T> implements Transform<T, T> {
   public ReduceTransform(final Function2<T, T, T> func) {
     this.func = func;
     this.result = null;
-    this.filename = filename + JavaRDD.getResultId();
   }
 
   @Override
@@ -98,15 +91,5 @@ public final class ReduceTransform<T> implements Transform<T, T> {
 
   @Override
   public void close() {
-    // Write result to a temporary file.
-    // TODO #16: Implement collection of data from executor to client.
-    try {
-      final Kryo kryo = new Kryo();
-      final Output output = new Output(new FileOutputStream(filename));
-      kryo.writeClassAndObject(output, result);
-      output.close();
-    } catch (FileNotFoundException e) {
-      throw new RuntimeException(e);
-    }
   }
 }
diff --git a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScalaITCase.java b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
similarity index 67%
copy from examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScalaITCase.java
copy to examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
index 5541e58..c4d178f 100644
--- a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScalaITCase.java
+++ b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
@@ -13,6 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package edu.snu.nemo.examples.spark;
 
 import edu.snu.nemo.client.JobLauncher;
@@ -27,13 +28,13 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 /**
- * Test Spark programs with JobLauncher.
+ * Test MR Spark programs with JobLauncher.
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
 @PowerMockIgnore("javax.management.*")
-public final class SparkScalaITCase {
-  private static final int TIMEOUT = 120000;
+public final class MRJava {
+  private static final int TIMEOUT = 180000;
   private static ArgBuilder builder;
   private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
   private static final String executorResourceFileName = fileBasePath + "spark_sample_executor_resources.json";
@@ -45,29 +46,42 @@ public final class SparkScalaITCase {
   }
 
   @Test(timeout = TIMEOUT)
-  public void testPi() throws Exception {
-    final String numParallelism = "3";
+  public void testSparkWordCount() throws Exception {
+    final String inputFileName = "sample_input_wordcount_spark";
+    final String outputFileName = "sample_output_wordcount_spark";
+    final String testResourceFilename = "test_output_wordcount_spark";
+    final String inputFilePath = fileBasePath + inputFileName;
+    final String outputFilePath = fileBasePath + outputFileName;
 
     JobLauncher.main(builder
-        .addJobId(SparkPi.class.getSimpleName() + "_test")
-        .addUserMain(SparkPi.class.getCanonicalName())
-        .addUserArgs(numParallelism)
+        .addJobId(JavaWordCount.class.getSimpleName() + "_test")
+        .addUserMain(JavaWordCount.class.getCanonicalName())
+        .addUserArgs(inputFilePath, outputFilePath)
         .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
         .build());
+
+    try {
+      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, testResourceFilename);
+    } finally {
+      ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+    }
   }
 
+  /* Temporary disabled because of Travis issue
   @Test(timeout = TIMEOUT)
-  public void testWordCount() throws Exception {
+  public void testSparkMapReduce() throws Exception {
     final String inputFileName = "sample_input_wordcount_spark";
-    final String outputFileName = "sample_output_wordcount_spark";
+    final String outputFileName = "sample_output_mr";
     final String testResourceFilename = "test_output_wordcount_spark";
     final String inputFilePath = fileBasePath + inputFileName;
     final String outputFilePath = fileBasePath + outputFileName;
+    final String parallelism = "2";
+    final String runOnYarn = "false";
 
     JobLauncher.main(builder
-        .addJobId(SparkWordCount.class.getSimpleName() + "_test")
-        .addUserMain(SparkWordCount.class.getCanonicalName())
-        .addUserArgs(inputFilePath, outputFilePath)
+        .addJobId(JavaMapReduce.class.getSimpleName() + "_test")
+        .addUserMain(JavaMapReduce.class.getCanonicalName())
+        .addUserArgs(inputFilePath, outputFilePath, parallelism, runOnYarn)
         .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
         .build());
 
@@ -76,5 +90,5 @@ public final class SparkScalaITCase {
     } finally {
       ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
     }
-  }
+  }*/
 }
diff --git a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkJavaITCase.java b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkJava.java
similarity index 67%
rename from examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkJavaITCase.java
rename to examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkJava.java
index 8aa1d48..132e517 100644
--- a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkJavaITCase.java
+++ b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkJava.java
@@ -34,7 +34,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
 @PowerMockIgnore("javax.management.*")
-public final class SparkJavaITCase {
+public final class SparkJava {
   private static final int TIMEOUT = 180000;
   private static ArgBuilder builder;
   private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
@@ -47,53 +47,6 @@ public final class SparkJavaITCase {
   }
 
   @Test(timeout = TIMEOUT)
-  public void testSparkWordCount() throws Exception {
-    final String inputFileName = "sample_input_wordcount_spark";
-    final String outputFileName = "sample_output_wordcount_spark";
-    final String testResourceFilename = "test_output_wordcount_spark";
-    final String inputFilePath = fileBasePath + inputFileName;
-    final String outputFilePath = fileBasePath + outputFileName;
-
-    JobLauncher.main(builder
-        .addJobId(JavaWordCount.class.getSimpleName() + "_test")
-        .addUserMain(JavaWordCount.class.getCanonicalName())
-        .addUserArgs(inputFilePath, outputFilePath)
-        .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
-        .build());
-
-    try {
-      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, testResourceFilename);
-    } finally {
-      ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
-    }
-  }
-
-  /* Temporary disabled because of Travis issue
-  @Test(timeout = TIMEOUT)
-  public void testSparkMapReduce() throws Exception {
-    final String inputFileName = "sample_input_mr";
-    final String outputFileName = "sample_output_mr";
-    final String testResourceFileName = "test_output_mr";
-    final String inputFilePath = fileBasePath + inputFileName;
-    final String outputFilePath = fileBasePath + outputFileName;
-    final String parallelism = "2";
-    final String runOnYarn = "false";
-
-    JobLauncher.main(builder
-        .addJobId(JavaMapReduce.class.getSimpleName() + "_test")
-        .addUserMain(JavaMapReduce.class.getCanonicalName())
-        .addUserArgs(inputFilePath, outputFilePath, parallelism, runOnYarn)
-        .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
-        .build());
-
-    try {
-      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, testResourceFileName);
-    } finally {
-      ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
-    }
-  }*/
-
-  @Test(timeout = TIMEOUT)
   public void testSparkPi() throws Exception {
     final String numParallelism = "3";
 
diff --git a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScalaITCase.java b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScala.java
similarity index 98%
rename from examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScalaITCase.java
rename to examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScala.java
index 5541e58..2aaae7c 100644
--- a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScalaITCase.java
+++ b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScala.java
@@ -32,7 +32,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
 @PowerMockIgnore("javax.management.*")
-public final class SparkScalaITCase {
+public final class SparkScala {
   private static final int TIMEOUT = 120000;
   private static ArgBuilder builder;
   private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
index 50211b3..3b7e44c 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
@@ -196,6 +196,7 @@ public final class NcsMessageEnvironment implements MessageEnvironment {
       case BlockStateChanged:
       case ExecutorFailed:
       case DataSizeMetric:
+      case ExecutorDataCollected:
       case MetricMessageReceived:
         return MessageType.Send;
       case RequestBlockLocation:
diff --git a/runtime/common/src/main/proto/ControlMessage.proto b/runtime/common/src/main/proto/ControlMessage.proto
index f6bd527..d662f66 100644
--- a/runtime/common/src/main/proto/ControlMessage.proto
+++ b/runtime/common/src/main/proto/ControlMessage.proto
@@ -32,13 +32,19 @@ message LaunchDAGMessage {
     required string dag = 1;
 }
 
+message DataCollectMessage {
+    required string data = 1;
+}
+
 enum DriverToClientMessageType {
     DriverStarted = 0;
     ResourceReady = 1;
+    DataCollected = 2;
 }
 
 message DriverToClientMessage {
     required DriverToClientMessageType type = 1;
+    optional DataCollectMessage dataCollected = 2;
 }
 
 enum MessageType {
@@ -50,12 +56,13 @@ enum MessageType {
     BlockLocationInfo = 5;
     ExecutorFailed = 6;
     MetricMessageReceived = 7;
+    ExecutorDataCollected = 8;
 }
 
 message Message {
     required MessageType type = 1;
     required int64 id = 2;
-    required string listenerId = 3; // The id of the message listner (handler).
+    required string listenerId = 3; // The id of the message listener (handler).
     optional TaskStateChangedMsg taskStateChangedMsg = 4;
     optional ScheduleTaskMsg scheduleTaskMsg = 5;
     optional BlockStateChangedMsg blockStateChangedMsg = 6;
@@ -65,6 +72,7 @@ message Message {
     optional ExecutorFailedMsg executorFailedMsg = 10;
     optional ContainerFailedMsg containerFailedMsg = 11;
     optional MetricMsg metricMsg = 12;
+    optional DataCollectMessage dataCollected = 13;
 }
 
 // Messages from Master to Executors
@@ -92,7 +100,7 @@ message TaskStateChangedMsg {
 enum RecoverableFailureCause {
     InputReadFailure = 0;
     OutputWriteFailure = 1;
-    // There is a 3rd cause: container_failure, but this is ommitted here as it is never propagated with a control msg.
+    // There is a 3rd cause: container_failure, but this is omitted here as it is never propagated with a control msg.
 }
 
 message BlockStateChangedMsg {
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
index 8102d64..d6c6ccc 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
@@ -20,6 +20,7 @@ import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageParameters;
+import edu.snu.nemo.runtime.master.ClientRPC;
 import edu.snu.nemo.runtime.master.RuntimeMaster;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.driver.client.JobMessageObserver;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index 69f5a17..f5a2e83 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -127,8 +127,8 @@ public final class Executor {
             e.getPropertyValue(DecompressionProperty.class).orElse(null)));
       });
 
-      new TaskExecutor(
-          task, irDag, taskStateManager, dataTransferFactory, metricMessageSender).execute();
+      new TaskExecutor(task, irDag, taskStateManager, dataTransferFactory,
+          metricMessageSender, persistentConnectionToMasterMap).execute();
     } catch (final Exception e) {
       persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
           ControlMessage.Message.newBuilder()
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
index 87f09c0..fe20ade 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
@@ -23,6 +23,9 @@ import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.vertex.*;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.comm.ControlMessage;
+import edu.snu.nemo.runtime.common.message.MessageEnvironment;
+import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
@@ -65,6 +68,8 @@ public final class TaskExecutor {
   // Dynamic optimization
   private String idOfVertexPutOnHold;
 
+  private final PersistentConnectionToMasterMap persistentConnectionToMasterMap;
+
   /**
    * Constructor.
    * @param task Task with information needed during execution.
@@ -77,7 +82,8 @@ public final class TaskExecutor {
                       final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
                       final TaskStateManager taskStateManager,
                       final DataTransferFactory dataTransferFactory,
-                      final MetricMessageSender metricMessageSender) {
+                      final MetricMessageSender metricMessageSender,
+                      final PersistentConnectionToMasterMap persistentConnectionToMasterMap) {
     // Essential information
     this.isExecuted = false;
     this.taskId = task.getTaskId();
@@ -91,6 +97,8 @@ public final class TaskExecutor {
     // Assigning null is very bad, but we are keeping this for now
     this.idOfVertexPutOnHold = null;
 
+    this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
+
     // Prepare data structures
     this.sideInputMap = new HashMap();
     final Pair<List<DataFetcher>, List<VertexHarness>> pair = prepare(task, irVertexDag, dataTransferFactory);
@@ -407,6 +415,14 @@ public final class TaskExecutor {
       Transform transform = ((OperatorVertex) irVertex).getTransform();
       transform.close();
     }
+    vertexHarness.getContext().getSerializedData().ifPresent(data ->
+        persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
+            ControlMessage.Message.newBuilder()
+                .setId(RuntimeIdGenerator.generateMessageId())
+                .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
+                .setType(ControlMessage.MessageType.ExecutorDataCollected)
+                .setDataCollected(ControlMessage.DataCollectMessage.newBuilder().setData(data).build())
+                .build()));
   }
 
   ////////////////////////////////////////////// Misc
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index 73f531c..846bcca 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -43,6 +43,7 @@ import edu.snu.nemo.runtime.executor.Executor;
 import edu.snu.nemo.runtime.executor.MetricManagerWorker;
 import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
 import edu.snu.nemo.runtime.executor.data.SerializerManager;
+import edu.snu.nemo.runtime.master.ClientRPC;
 import edu.snu.nemo.runtime.master.MetricMessageHandler;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
@@ -90,7 +91,7 @@ import static org.mockito.Mockito.mock;
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class, MetricMessageHandler.class,
-    SourceVertex.class})
+    SourceVertex.class, ClientRPC.class})
 public final class DataTransferTest {
   private static final String EXECUTOR_ID_PREFIX = "Executor";
   private static final InterTaskDataStoreProperty.Value MEMORY_STORE = InterTaskDataStoreProperty.Value.MemoryStore;
@@ -136,11 +137,12 @@ public final class DataTransferTest {
     final Scheduler scheduler = new BatchSingleJobScheduler(
         schedulerRunner, taskQueue, master, pubSubEventHandler, updatePhysicalPlanEventHandler, executorRegistry);
     final AtomicInteger executorCount = new AtomicInteger(0);
+    final ClientRPC clientRPC = mock(ClientRPC.class);
 
     // Necessary for wiring up the message environments
     final RuntimeMaster runtimeMaster =
         new RuntimeMaster(scheduler, containerManager, master,
-            metricMessageHandler, messageEnvironment, EMPTY_DAG_DIRECTORY);
+            metricMessageHandler, messageEnvironment, clientRPC, EMPTY_DAG_DIRECTORY);
 
     final Injector injector1 = Tang.Factory.getTang().newInjector();
     injector1.bindVolatileInstance(MessageEnvironment.class, messageEnvironment);
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
index 5e0a267..0c58744 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -28,6 +28,7 @@ import edu.snu.nemo.common.ir.vertex.transform.Transform;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
@@ -65,7 +66,7 @@ import static org.mockito.Mockito.*;
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({InputReader.class, OutputWriter.class, DataTransferFactory.class,
-    TaskStateManager.class, StageEdge.class})
+    TaskStateManager.class, StageEdge.class, PersistentConnectionToMasterMap.class})
 public final class TaskExecutorTest {
   private static final int DATA_SIZE = 100;
   private static final ExecutionPropertyMap<VertexExecutionProperty> TASK_EXECUTION_PROPERTY_MAP
@@ -76,6 +77,7 @@ public final class TaskExecutorTest {
   private DataTransferFactory dataTransferFactory;
   private TaskStateManager taskStateManager;
   private MetricMessageSender metricMessageSender;
+  private PersistentConnectionToMasterMap persistentConnectionToMasterMap;
   private AtomicInteger stageId;
 
   private String generateTaskId() {
@@ -101,6 +103,8 @@ public final class TaskExecutorTest {
     metricMessageSender = mock(MetricMessageSender.class);
     doNothing().when(metricMessageSender).send(anyString(), anyString());
     doNothing().when(metricMessageSender).close();
+
+    persistentConnectionToMasterMap = mock(PersistentConnectionToMasterMap.class);
   }
 
   private boolean checkEqualElements(final List<Integer> left, final List<Integer> right) {
@@ -147,7 +151,7 @@ public final class TaskExecutorTest {
 
     // Execute the task.
     final TaskExecutor taskExecutor = new TaskExecutor(
-        task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+        task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender, persistentConnectionToMasterMap);
     taskExecutor.execute();
 
     // Check the output.
@@ -177,7 +181,7 @@ public final class TaskExecutorTest {
 
     // Execute the task.
     final TaskExecutor taskExecutor = new TaskExecutor(
-        task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+        task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender, persistentConnectionToMasterMap);
     taskExecutor.execute();
 
     // Check the output.
@@ -215,7 +219,7 @@ public final class TaskExecutorTest {
 
     // Execute the task.
     final TaskExecutor taskExecutor = new TaskExecutor(
-        task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+        task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender, persistentConnectionToMasterMap);
     taskExecutor.execute();
 
     // Check the output.
@@ -247,7 +251,7 @@ public final class TaskExecutorTest {
 
     // Execute the task.
     final TaskExecutor taskExecutor = new TaskExecutor(
-        task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+        task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender, persistentConnectionToMasterMap);
     taskExecutor.execute();
 
     // Check the output.
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/ClientRPC.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/ClientRPC.java
similarity index 99%
rename from runtime/driver/src/main/java/edu/snu/nemo/driver/ClientRPC.java
rename to runtime/master/src/main/java/edu/snu/nemo/runtime/master/ClientRPC.java
index 82698f0..2da083c 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/ClientRPC.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/ClientRPC.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.driver;
+package edu.snu.nemo.runtime.master;
 
 import com.google.protobuf.InvalidProtocolBufferException;
 import edu.snu.nemo.conf.JobConf;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index a7df05a..4529a0b 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -78,6 +78,7 @@ public final class RuntimeMaster {
   private final MetricMessageHandler metricMessageHandler;
   private final MessageEnvironment masterMessageEnvironment;
   private final Map<Integer, Long> aggregatedMetricData;
+  private final ClientRPC clientRPC;
 
   // For converting json data. This is a thread safe.
   private final ObjectMapper objectMapper;
@@ -94,6 +95,7 @@ public final class RuntimeMaster {
                        final BlockManagerMaster blockManagerMaster,
                        final MetricMessageHandler metricMessageHandler,
                        final MessageEnvironment masterMessageEnvironment,
+                       final ClientRPC clientRPC,
                        @Parameter(JobConf.DAGDirectory.class) final String dagDirectory) {
     // We would like to use a single thread for runtime master operations
     // since the processing logic in master takes a very short amount of time
@@ -107,6 +109,7 @@ public final class RuntimeMaster {
     this.masterMessageEnvironment = masterMessageEnvironment;
     this.masterMessageEnvironment
         .setupListener(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID, new MasterControlMessageReceiver());
+    this.clientRPC = clientRPC;
     this.dagDirectory = dagDirectory;
     this.irVertices = new HashSet<>();
     this.resourceRequestCount = new AtomicInteger(0);
@@ -300,6 +303,13 @@ public final class RuntimeMaster {
         metricList.forEach(metric ->
             metricMessageHandler.onMetricMessageReceived(metric.getMetricKey(), metric.getMetricValue()));
         break;
+      case ExecutorDataCollected:
+        final String serializedData = message.getDataCollected().getData();
+        clientRPC.send(ControlMessage.DriverToClientMessage.newBuilder()
+            .setType(ControlMessage.DriverToClientMessageType.DataCollected)
+            .setDataCollected(ControlMessage.DataCollectMessage.newBuilder().setData(serializedData).build())
+            .build());
+        break;
       default:
         throw new IllegalMessageException(
             new Exception("This message should not be received by Master :" + message.getType()));
diff --git a/tests/src/test/java/edu/snu/nemo/client/ClientDriverRPCTest.java b/tests/src/test/java/edu/snu/nemo/client/ClientDriverRPCTest.java
index af94b59..6122de1 100644
--- a/tests/src/test/java/edu/snu/nemo/client/ClientDriverRPCTest.java
+++ b/tests/src/test/java/edu/snu/nemo/client/ClientDriverRPCTest.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.client;
 
-import edu.snu.nemo.driver.ClientRPC;
+import edu.snu.nemo.runtime.master.ClientRPC;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;