You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ja...@apache.org on 2018/07/19 10:39:58 UTC

[incubator-nemo] branch master updated: [NEMO-62] Support Multiple DAG Submission in a Single User Program (#73)

This is an automated email from the ASF dual-hosted git repository.

jangho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 0da3f85  [NEMO-62] Support Multiple DAG Submission in a Single User Program (#73)
0da3f85 is described below

commit 0da3f850616f527b619203ed0a142f60ce2a3936
Author: Won Wook SONG <wo...@apache.org>
AuthorDate: Thu Jul 19 19:39:55 2018 +0900

    [NEMO-62] Support Multiple DAG Submission in a Single User Program (#73)
    
    JIRA: [NEMO-62: Support Multiple Jobs Submission in a Single User Program](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-62)
    
    **Major changes:**
    - The client and the driver interacts with each other to notify the status of each other when executing the job, enabling the client to submit DAGs multiple times to the driver.
    - At the moment, the scheduler does not handle multiple DAGs, but it's possible to submit multiple DAGs to the driver. The changes here are related to the client side. The actual execution will be handled with [NEMO-152](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-152?filter=allopenissues).
    - Beam launches a DAG a single time, as it collects everything before launching the pipeline, but Spark provides `actions` that we can use multiple times in an application. In order to support the functionality, this change is necessary.
    
    **Minor changes to note:**
    - Handle SonarQube issues and fix minor typos and grammatical errors.
    
    **Tests for the changes:**
    - Add an example test program (IT case) to ensure that it works properly.
    
    **Other comments:**
    - N/A
    
    resolves [NEMO-62](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-62)
---
 .../main/java/edu/snu/nemo/client/JobLauncher.java | 120 ++++++++++++++++-----
 .../nemo/common/ir/vertex/transform/Transform.java |   2 +-
 examples/resources/test_output_word_and_line_count |  18 ++++
 .../nemo/examples/spark/JavaWordAndLineCount.java  |  98 +++++++++++++++++
 .../java/edu/snu/nemo/examples/spark/MRJava.java   |  24 ++++-
 .../nemo/runtime/common/metric/StateMetric.java    |   3 +-
 .../common/metric/StateTransitionEvent.java        |   4 +-
 .../runtime/common/plan/PhysicalPlanGenerator.java |   1 -
 runtime/common/src/main/proto/ControlMessage.proto |   4 +-
 .../main/java/edu/snu/nemo/driver/NemoDriver.java  |  34 ++++--
 .../edu/snu/nemo/driver/UserApplicationRunner.java |   1 -
 .../edu/snu/nemo/runtime/master/MetricStore.java   |  15 ++-
 .../edu/snu/nemo/runtime/master/RuntimeMaster.java |   4 +-
 13 files changed, 273 insertions(+), 55 deletions(-)

diff --git a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
index 0b80b83..75a015b 100644
--- a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
+++ b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
@@ -25,7 +25,6 @@ import edu.snu.nemo.runtime.common.message.MessageParameters;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.reef.client.DriverConfiguration;
 import org.apache.reef.client.DriverLauncher;
-import org.apache.reef.client.LauncherStatus;
 import org.apache.reef.client.parameters.JobMessageHandler;
 import org.apache.reef.io.network.naming.LocalNameResolverConfiguration;
 import org.apache.reef.io.network.naming.NameServerConfiguration;
@@ -51,6 +50,7 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Base64;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 
 /**
  * Job launcher.
@@ -62,8 +62,14 @@ public final class JobLauncher {
   private static Configuration jobAndDriverConf = null;
   private static Configuration deployModeConf = null;
   private static Configuration builtJobConf = null;
+
+  private static DriverLauncher driverLauncher;
+  private static DriverRPCServer driverRPCServer;
+
+  private static CountDownLatch driverReadyLatch;
+  private static CountDownLatch jobDoneLatch;
   private static String serializedDAG;
-  private static List<?> collectedData = new ArrayList<>();
+  private static final List<?> COLLECTED_DATA = new ArrayList<>();
 
   /**
    * private constructor.
@@ -78,16 +84,14 @@ public final class JobLauncher {
    * @throws Exception exception on the way.
    */
   public static void main(final String[] args) throws Exception {
-    final DriverRPCServer driverRPCServer = new DriverRPCServer();
+    driverRPCServer = new DriverRPCServer();
+
     // Registers actions for launching the DAG.
     driverRPCServer
         .registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted, event -> { })
-        .registerHandler(ControlMessage.DriverToClientMessageType.ResourceReady, event ->
-          driverRPCServer.send(ControlMessage.ClientToDriverMessage.newBuilder()
-              .setType(ControlMessage.ClientToDriverMessageType.LaunchDAG)
-              .setLaunchDAG(ControlMessage.LaunchDAGMessage.newBuilder().setDag(serializedDAG).build())
-              .build()))
-        .registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, message -> collectedData.addAll(
+        .registerHandler(ControlMessage.DriverToClientMessageType.DriverReady, event -> driverReadyLatch.countDown())
+        .registerHandler(ControlMessage.DriverToClientMessageType.ExecutionDone, event -> jobDoneLatch.countDown())
+        .registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, message -> COLLECTED_DATA.addAll(
             SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData()))))
         .run();
 
@@ -109,36 +113,91 @@ public final class JobLauncher {
     // Get DeployMode Conf
     deployModeConf = Configurations.merge(getDeployModeConf(builtJobConf), clientConf);
 
-    // Launch client main
-    runUserProgramMain(builtJobConf);
+    // Start Driver and launch user program.
+    try {
+      if (jobAndDriverConf == null || deployModeConf == null || builtJobConf == null) {
+        throw new RuntimeException("Configuration for launching driver is not ready");
+      }
+
+      // Launch driver
+      LOG.info("Launching driver");
+      driverReadyLatch = new CountDownLatch(1);
+      driverLauncher = DriverLauncher.getLauncher(deployModeConf);
+      driverLauncher.submit(jobAndDriverConf, 500);
+      // When the driver is up and the resource is ready, the DriverReady message is delivered.
 
-    driverRPCServer.shutdown();
+      // Launch client main
+      runUserProgramMain(builtJobConf);
+
+      // Trigger driver shutdown afterwards
+      driverRPCServer.send(ControlMessage.ClientToDriverMessage.newBuilder()
+          .setType(ControlMessage.ClientToDriverMessageType.DriverShutdown).build());
+      // Wait for driver to naturally finish
+      synchronized (driverLauncher) {
+        while (!driverLauncher.getStatus().isDone()) {
+          try {
+            LOG.info("Wait for the driver to finish");
+            driverLauncher.wait();
+          } catch (final InterruptedException e) {
+            LOG.warn("Interrupted: " + e);
+            // clean up state...
+            Thread.currentThread().interrupt();
+          }
+        }
+        LOG.info("Driver terminated");
+      }
+    } catch (final InjectionException e) {
+      throw new RuntimeException(e);
+    } finally {
+      // Close everything that's left
+      driverRPCServer.shutdown();
+      driverLauncher.close();
+      final Optional<Throwable> possibleError = driverLauncher.getStatus().getError();
+      if (possibleError.isPresent()) {
+        throw new RuntimeException(possibleError.get());
+      } else {
+        LOG.info("Job successfully completed");
+      }
+    }
   }
 
   /**
    * Launch application using the application DAG.
+   * Notice that we launch the DAG one at a time, as the result of a DAG has to be immediately returned to the
+   * Java variable before the application can be resumed.
    *
    * @param dag the application DAG.
    */
   // When modifying the signature of this method, see CompilerTestUtil#compileDAG and make corresponding changes
   public static void launchDAG(final DAG dag) {
+    // Wait until the driver is ready.
     try {
-      if (jobAndDriverConf == null || deployModeConf == null || builtJobConf == null) {
-        throw new RuntimeException("Configuration for launching driver is not ready");
-      }
-      serializedDAG = Base64.getEncoder().encodeToString(SerializationUtils.serialize(dag));
-      // Launch and wait indefinitely for the job to finish
-      final LauncherStatus launcherStatus = DriverLauncher.getLauncher(deployModeConf)
-          .run(jobAndDriverConf);
-      final Optional<Throwable> possibleError = launcherStatus.getError();
-      if (possibleError.isPresent()) {
-        throw new RuntimeException(possibleError.get());
-      } else {
-        LOG.info("Job successfully completed");
-      }
-    } catch (final InjectionException e) {
-      throw new RuntimeException(e);
+      LOG.info("Waiting for the driver to be ready");
+      driverReadyLatch.await();
+    } catch (final InterruptedException e) {
+      LOG.warn("Interrupted: " + e);
+      // clean up state...
+      Thread.currentThread().interrupt();
+    }
+
+    LOG.info("Launching DAG...");
+    serializedDAG = Base64.getEncoder().encodeToString(SerializationUtils.serialize(dag));
+    jobDoneLatch = new CountDownLatch(1);
+    driverRPCServer.send(ControlMessage.ClientToDriverMessage.newBuilder()
+        .setType(ControlMessage.ClientToDriverMessageType.LaunchDAG)
+        .setLaunchDAG(ControlMessage.LaunchDAGMessage.newBuilder().setDag(serializedDAG).build())
+        .build());
+
+    // Wait for the ExecutionDone message from the driver
+    try {
+      LOG.info("Waiting for the DAG to finish execution");
+      jobDoneLatch.await();
+    } catch (final InterruptedException e) {
+      LOG.warn("Interrupted: " + e);
+      // clean up state...
+      Thread.currentThread().interrupt();
     }
+    LOG.info("DAG execution done");
   }
 
   /**
@@ -160,7 +219,9 @@ public final class JobLauncher {
       throw new RuntimeException("User Main Class not public");
     }
 
+    LOG.info("User program started");
     method.invoke(null, (Object) args);
+    LOG.info("User program finished");
   }
 
   /**
@@ -320,9 +381,12 @@ public final class JobLauncher {
   /**
    * Get the collected data.
    *
+   * @param <T> the type of the data.
    * @return the collected data.
    */
   public static <T> List<T> getCollectedData() {
-    return (List<T>) collectedData;
+    final List<T> result = (List<T>) new ArrayList<>(COLLECTED_DATA);
+    COLLECTED_DATA.clear(); // flush after fetching.
+    return result;
   }
 }
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java
index db1927e..d41f9ea 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java
@@ -56,7 +56,7 @@ public interface Transform<I, O> extends Serializable {
   /**
    * Context of the transform.
    */
-  interface Context {
+  interface Context extends Serializable {
     /**
      * @return sideInputs.
      */
diff --git a/examples/resources/test_output_word_and_line_count b/examples/resources/test_output_word_and_line_count
new file mode 100644
index 0000000..d3181c4
--- /dev/null
+++ b/examples/resources/test_output_word_and_line_count
@@ -0,0 +1,18 @@
+line count: 11
+
+banana: 3
+bicycle: 2
+one: 1
+girl: 1
+two: 2
+three: 3
+tennis: 3
+jy: 4
+ski: 5
+piano: 3
+wonook: 5
+shakespeare: 1
+john: 2
+jangho: 2
+sanha: 1
+snowboard: 2
diff --git a/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaWordAndLineCount.java b/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaWordAndLineCount.java
new file mode 100644
index 0000000..a8ac3b5
--- /dev/null
+++ b/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaWordAndLineCount.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.snu.nemo.examples.spark;
+
+import edu.snu.nemo.compiler.frontend.spark.core.rdd.JavaPairRDD;
+import edu.snu.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
+import edu.snu.nemo.compiler.frontend.spark.sql.SparkSession;
+import scala.Tuple2;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Java Spark word-count and line-count examples in one.
+ */
+public final class JavaWordAndLineCount {
+  private static final Pattern SPACE = Pattern.compile(" ");
+
+  /**
+   * Private constructor.
+   */
+  private JavaWordAndLineCount() {
+  }
+
+  /**
+   * Main method.
+   * @param args arguments.
+   * @throws Exception exceptions.
+   */
+  public static void main(final String[] args) throws Exception {
+
+    if (args.length < 1) {
+      System.err.println("Usage: JavaWordAndLineCount <input_file> [<output_file>]");
+      System.exit(1);
+    }
+
+    SparkSession spark = SparkSession
+        .builder()
+        .appName("JavaWordAndLineCount")
+        .getOrCreate();
+
+    JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
+
+    JavaPairRDD<String, Integer> lineOnes = lines.mapToPair(s -> new Tuple2<>("line count", 1));
+
+    JavaPairRDD<String, Integer> lineCounts = lineOnes.reduceByKey((i1, i2) -> i1 + i2);
+
+    List<Tuple2<String, Integer>> lineOutput = lineCounts.collect();
+
+    JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
+
+    JavaPairRDD<String, Integer> wordOnes = words.mapToPair(s -> new Tuple2<>(s, 1));
+
+    JavaPairRDD<String, Integer> wordCounts = wordOnes.reduceByKey((i1, i2) -> i1 + i2);
+
+    List<Tuple2<String, Integer>> wordOutput = wordCounts.collect();
+
+    final boolean writemode = args[1] != null;
+    if (writemode) { // print to output file
+      try (BufferedWriter bw = new BufferedWriter(new FileWriter(args[1]))) {
+        for (Tuple2<?, ?> lineTuple : lineOutput) {
+          bw.write(lineTuple._1 + ": " + lineTuple._2 + "\n\n");
+        }
+        for (Tuple2<?, ?> wordTuple : wordOutput) {
+          bw.write(wordTuple._1 + ": " + wordTuple._2 + "\n");
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    } else { // print to console.
+      for (Tuple2<?, ?> lineTuple : lineOutput) {
+        System.out.println(lineTuple._1 + ": " + lineTuple._2 + "\n");
+      }
+      for (Tuple2<?, ?> wordTuple : wordOutput) {
+        System.out.println(wordTuple._1 + ": " + wordTuple._2);
+      }
+    }
+    spark.stop();
+  }
+}
diff --git a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
index c4d178f..96ec20c 100644
--- a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
+++ b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
@@ -66,8 +66,30 @@ public final class MRJava {
       ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
     }
   }
+  /* TODO #152: enable execution of multiple jobs (call scheduleJob multiple times with caching).
+  @Test(timeout = TIMEOUT)
+  public void testSparkWordAndLineCount() throws Exception {
+    final String inputFileName = "sample_input_wordcount_spark";
+    final String outputFileName = "sample_output_word_and_line_count";
+    final String testResourceFilename = "test_output_word_and_line_count";
+    final String inputFilePath = fileBasePath + inputFileName;
+    final String outputFilePath = fileBasePath + outputFileName;
+
+    JobLauncher.main(builder
+        .addJobId(JavaWordAndLineCount.class.getSimpleName() + "_test")
+        .addUserMain(JavaWordAndLineCount.class.getCanonicalName())
+        .addUserArgs(inputFilePath, outputFilePath)
+        .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
+        .build());
+
+    try {
+      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, testResourceFilename);
+    } finally {
+      ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+    }
+  }
 
-  /* Temporary disabled because of Travis issue
+  /* Temporary disabled due to Travis issue
   @Test(timeout = TIMEOUT)
   public void testSparkMapReduce() throws Exception {
     final String inputFileName = "sample_input_wordcount_spark";
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateMetric.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateMetric.java
index 426e8e0..2744d58 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateMetric.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateMetric.java
@@ -15,13 +15,14 @@
  */
 package edu.snu.nemo.runtime.common.metric;
 
+import java.io.Serializable;
 import java.util.List;
 
 /**
  * Interface for metric which contians its state.
  * @param <T> class of state of the metric.
  */
-public interface StateMetric<T> extends Metric {
+public interface StateMetric<T extends Serializable> extends Metric {
   /**
    * Get its list of {@link StateTransitionEvent}.
    * @return list of events.
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateTransitionEvent.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateTransitionEvent.java
index 43ce124..abf164c 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateTransitionEvent.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateTransitionEvent.java
@@ -15,11 +15,13 @@
  */
 package edu.snu.nemo.runtime.common.metric;
 
+import java.io.Serializable;
+
 /**
  * Event of state transition. It contains timestamp and the state transition.
  * @param <T> class of state for the metric.
  */
-public final class StateTransitionEvent<T> extends Event {
+public final class StateTransitionEvent<T extends Serializable> extends Event {
   private T prevState;
   private T newState;
 
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index 58c3fc6..13d8f94 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -79,7 +79,6 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
     // this is needed because of DuplicateEdgeGroupProperty.
     handleDuplicateEdgeGroupProperty(dagOfStages);
 
-
     // Split StageGroup by Pull StageEdges
     splitScheduleGroupByPullStageEdges(dagOfStages);
 
diff --git a/runtime/common/src/main/proto/ControlMessage.proto b/runtime/common/src/main/proto/ControlMessage.proto
index b0b1747..59a3212 100644
--- a/runtime/common/src/main/proto/ControlMessage.proto
+++ b/runtime/common/src/main/proto/ControlMessage.proto
@@ -21,6 +21,7 @@ option java_outer_classname = "ControlMessage";
 
 enum ClientToDriverMessageType {
     LaunchDAG = 0;
+    DriverShutdown = 1;
 }
 
 message ClientToDriverMessage {
@@ -38,8 +39,9 @@ message DataCollectMessage {
 
 enum DriverToClientMessageType {
     DriverStarted = 0;
-    ResourceReady = 1;
+    DriverReady = 1;
     DataCollected = 2;
+    ExecutionDone = 3;
 }
 
 message DriverToClientMessage {
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
index 4519116..ca232e0 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
@@ -73,6 +73,9 @@ public final class NemoDriver {
   private final String glusterDirectory;
   private final ClientRPC clientRPC;
 
+  private static ExecutorService runnerThread = Executors.newSingleThreadExecutor(
+      new BasicThreadFactory.Builder().namingPattern("User App thread-%d").build());
+
   // Client for sending log messages
   private final RemoteClientMessageLoggingHandler handler;
 
@@ -101,8 +104,9 @@ public final class NemoDriver {
     this.clientRPC = clientRPC;
     // TODO #69: Support job-wide execution property
     NodeNamesAssignmentPass.setBandwidthSpecificationString(bandwidthString);
-    clientRPC.registerHandler(ControlMessage.ClientToDriverMessageType.LaunchDAG,
-        message -> startSchedulingUserApplication(message.getLaunchDAG().getDag()));
+    clientRPC.registerHandler(ControlMessage.ClientToDriverMessageType.LaunchDAG, message ->
+        startSchedulingUserDAG(message.getLaunchDAG().getDag()));
+    clientRPC.registerHandler(ControlMessage.ClientToDriverMessageType.DriverShutdown, message -> shutdown());
     // Send DriverStarted message to the client
     clientRPC.send(ControlMessage.DriverToClientMessage.newBuilder()
         .setType(ControlMessage.DriverToClientMessageType.DriverStarted).build());
@@ -117,6 +121,15 @@ public final class NemoDriver {
   }
 
   /**
+   * Trigger shutdown of the driver and the runtime master.
+   */
+  private void shutdown() {
+    LOG.info("Driver shutdown initiated");
+    runnerThread.execute(runtimeMaster::terminate);
+    runnerThread.shutdown();
+  }
+
+  /**
    * Driver started.
    */
   public final class StartHandler implements EventHandler<StartTime> {
@@ -149,20 +162,21 @@ public final class NemoDriver {
 
       if (finalExecutorLaunched) {
         clientRPC.send(ControlMessage.DriverToClientMessage.newBuilder()
-            .setType(ControlMessage.DriverToClientMessageType.ResourceReady).build());
+            .setType(ControlMessage.DriverToClientMessageType.DriverReady).build());
       }
     }
   }
 
   /**
-   * Start user application.
+   * Start user DAG.
    */
-  public void startSchedulingUserApplication(final String dagString) {
-    // Launch user application (with a new thread)
-    final ExecutorService userApplicationRunnerThread = Executors.newSingleThreadExecutor(
-        new BasicThreadFactory.Builder().namingPattern("User App thread-%d").build());
-    userApplicationRunnerThread.execute(() -> userApplicationRunner.run(dagString));
-    userApplicationRunnerThread.shutdown();
+  public void startSchedulingUserDAG(final String dagString) {
+    runnerThread.execute(() -> {
+      userApplicationRunner.run(dagString);
+      // send driver notification that user application is done.
+      clientRPC.send(ControlMessage.DriverToClientMessage.newBuilder()
+          .setType(ControlMessage.DriverToClientMessageType.ExecutionDone).build());
+    });
   }
 
   /**
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
index 4ab57a2..aad8185 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
@@ -120,7 +120,6 @@ public final class UserApplicationRunner {
 
       jobStateManager.storeJSON(dagDirectory, "final");
       LOG.info("{} is complete!", physicalPlan.getId());
-      runtimeMaster.terminate();
     } catch (final Exception e) {
       throw new RuntimeException(e);
     }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricStore.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricStore.java
index 09c7c9d..1856389 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricStore.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricStore.java
@@ -33,18 +33,15 @@ import java.util.*;
 public final class MetricStore {
   private final Map<Class, Map<String, Object>> metricMap = new HashMap<>();
   // You can add more metrics by adding item to this metricList list.
-  private final Map<String, Class> metricList = new HashMap<String, Class>() {
-    {
-      put("JobMetric", JobMetric.class);
-      put("StageMetric", StageMetric.class);
-      put("TaskMetric", TaskMetric.class);
-    }
-  };
-
+  private final Map<String, Class> metricList = new HashMap<>();
   /**
    * Private constructor.
    */
-  private MetricStore() { }
+  private MetricStore() {
+    metricList.put("JobMetric", JobMetric.class);
+    metricList.put("StageMetric", StageMetric.class);
+    metricList.put("TaskMetric", TaskMetric.class);
+  }
 
   /**
    * Getter for singleton instance.
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 454e91b..91c385c 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -186,7 +186,9 @@ public final class RuntimeMaster {
         LOG.warn("Terminating master before all executor terminated messages arrived.");
       }
     } catch (final InterruptedException e) {
-      LOG.warn("Waiting executor terminating process interrupted.");
+      LOG.warn("Waiting executor terminating process interrupted: " + e);
+      // clean up state...
+      Thread.currentThread().interrupt();
     }
     runtimeMasterThread.execute(() -> {
       scheduler.terminate();