You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ta...@apache.org on 2020/03/24 23:18:35 UTC

[incubator-nemo] branch master updated: [NEMO-324] Distinguish Beam's run and waitUntilFinish methods (#187)

This is an automated email from the ASF dual-hosted git repository.

taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new dcb02fa  [NEMO-324] Distinguish Beam's run and waitUntilFinish methods (#187)
dcb02fa is described below

commit dcb02faf3385c3f4a7fe59b186dbf7872b553769
Author: Won Wook SONG <wo...@apache.org>
AuthorDate: Wed Mar 25 08:18:24 2020 +0900

    [NEMO-324] Distinguish Beam's run and waitUntilFinish methods (#187)
    
    JIRA: [NEMO-324: Distinguish Beam's run and waitUntilFinish methods](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-324)
    
    **Major changes:**
    - Gets rid of the UnsupportedOperation, makes `waitUntilFinish` method to a meaningful one.
    - Runs the application asynchronously on the `run` method.
    
    **Minor changes to note:**
    - Add the examples output directory to the gitignore.
    
    **Tests for the changes:**
    - I've added a same word count program that has a timeout of 1 second, and a test to confirm that it aborts after 1 second.
    - Existing tests cover other changes.
    
    **Other comments:**
    - None
    
    Closes #187
---
 .gitignore                                         |  1 +
 .../org/apache/nemo/client/ClientEndpoint.java     | 39 ++----------
 .../org/apache/nemo/client/DriverEndpoint.java     |  2 +-
 .../java/org/apache/nemo/client/JobLauncher.java   | 12 +++-
 .../nemo/client/beam/NemoPipelineResult.java       | 44 ++++++++++++--
 .../org/apache/nemo/client/beam/NemoRunner.java    |  6 +-
 .../common/exception/OutputMismatchException.java  | 44 ++++++++++++++
 .../apache/nemo/common/test/ExampleTestUtil.java   |  6 +-
 .../nemo/examples/beam/AlternatingLeastSquare.java |  2 +-
 .../beam/AlternatingLeastSquareInefficient.java    |  2 +-
 .../org/apache/nemo/examples/beam/Broadcast.java   |  2 +-
 .../nemo/examples/beam/MinimalWordCount.java       |  2 +-
 .../beam/MultinomialLogisticRegression.java        |  2 +-
 .../nemo/examples/beam/NetworkTraceAnalysis.java   |  2 +-
 .../nemo/examples/beam/PartitionWordsByLength.java |  2 +-
 .../apache/nemo/examples/beam/PerKeyMedian.java    |  2 +-
 .../nemo/examples/beam/PerPercentileAverage.java   |  2 +-
 .../apache/nemo/examples/beam/SimpleSumSQL.java    |  2 +-
 .../nemo/examples/beam/WindowedBroadcast.java      |  2 +-
 .../nemo/examples/beam/WindowedWordCount.java      |  2 +-
 .../org/apache/nemo/examples/beam/WordCount.java   | 15 ++++-
 .../nemo/examples/beam/WordCountTimeOut1Sec.java   | 64 ++++++++++++++++++++
 .../apache/nemo/examples/beam/TimeoutITCase.java   | 69 ++++++++++++++++++++++
 .../apache/nemo/driver/UserApplicationRunner.java  |  6 +-
 .../nemo/runtime/master/PlanStateManager.java      | 15 +----
 25 files changed, 276 insertions(+), 71 deletions(-)

diff --git a/.gitignore b/.gitignore
index 069c859..1795a60 100644
--- a/.gitignore
+++ b/.gitignore
@@ -50,6 +50,7 @@ atlassian-ide-plugin.xml
 # ----------------------------------------------------------------------
 # Temporary Files
 # ----------------------------------------------------------------------
+outputs/*
 tmp
 *~
 \#*
diff --git a/client/src/main/java/org/apache/nemo/client/ClientEndpoint.java b/client/src/main/java/org/apache/nemo/client/ClientEndpoint.java
index 257d23a..750b210 100644
--- a/client/src/main/java/org/apache/nemo/client/ClientEndpoint.java
+++ b/client/src/main/java/org/apache/nemo/client/ClientEndpoint.java
@@ -109,20 +109,7 @@ public abstract class ClientEndpoint {
    * @return {@code true} if the manager set.
    */
   private boolean waitUntilConnected() {
-    connectionLock.lock();
-    try {
-      if (driverEndpoint.get() == null) {
-        // If the driver endpoint is not connected, wait.
-        driverConnected.await();
-      }
-      return true;
-    } catch (final InterruptedException e) {
-      e.printStackTrace(System.err);
-      Thread.currentThread().interrupt();
-      return false;
-    } finally {
-      connectionLock.unlock();
-    }
+    return waitUntilConnected(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
   }
 
   /**
@@ -153,19 +140,16 @@ public abstract class ClientEndpoint {
     } else {
       // The driver endpoint is not connected yet.
       final long currentNano = System.nanoTime();
-      final boolean driverIsConnected;
-      if (DEFAULT_DRIVER_WAIT_IN_MILLIS < unit.toMillis(timeout)) {
-        driverIsConnected = waitUntilConnected(DEFAULT_DRIVER_WAIT_IN_MILLIS, TimeUnit.MILLISECONDS);
-      } else {
-        driverIsConnected = waitUntilConnected(timeout, unit);
-      }
+      final boolean driverIsConnected =
+        waitUntilConnected(Math.min(DEFAULT_DRIVER_WAIT_IN_MILLIS, unit.toMillis(timeout)), TimeUnit.MILLISECONDS);
 
       if (driverIsConnected) {
         final long consumedTime = System.nanoTime() - currentNano;
         return stateTranslator.translateState(driverEndpoint.get().
           waitUntilFinish(timeout - unit.convert(consumedTime, TimeUnit.NANOSECONDS), unit));
       } else {
-        return PlanState.State.READY;
+        // Driver is not connected.
+        return stateTranslator.translateState(PlanState.State.READY);
       }
     }
   }
@@ -176,17 +160,6 @@ public abstract class ClientEndpoint {
    * @return the final state of this job.
    */
   public final Enum waitUntilJobFinish() {
-    if (driverEndpoint.get() != null) {
-      return stateTranslator.translateState(driverEndpoint.get().waitUntilFinish());
-    } else {
-      // The driver endpoint is not connected yet.
-      final boolean driverIsConnected = waitUntilConnected();
-
-      if (driverIsConnected) {
-        return stateTranslator.translateState(driverEndpoint.get().waitUntilFinish());
-      } else {
-        return PlanState.State.READY;
-      }
-    }
+    return waitUntilJobFinish(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
   }
 }
diff --git a/client/src/main/java/org/apache/nemo/client/DriverEndpoint.java b/client/src/main/java/org/apache/nemo/client/DriverEndpoint.java
index 6deb8e6..35edea7 100644
--- a/client/src/main/java/org/apache/nemo/client/DriverEndpoint.java
+++ b/client/src/main/java/org/apache/nemo/client/DriverEndpoint.java
@@ -83,6 +83,6 @@ public final class DriverEndpoint {
    * @return the final state of this plan.
    */
   PlanState.State waitUntilFinish() {
-    return planStateManager.waitUntilFinish();
+    return waitUntilFinish(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
   }
 }
diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
index 9873f16..acd9a22 100644
--- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java
+++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
@@ -86,6 +86,7 @@ public final class JobLauncher {
   private static DriverLauncher driverLauncher;
   private static DriverRPCServer driverRPCServer;
 
+  private static boolean isSetUp = false;
   private static CountDownLatch driverReadyLatch;
   private static CountDownLatch jobDoneLatch;
   private static String serializedDAG;
@@ -170,6 +171,8 @@ public final class JobLauncher {
     // Launch driver
     LOG.info("Launching driver");
     driverReadyLatch = new CountDownLatch(1);
+    jobDoneLatch = new CountDownLatch(1);
+    isSetUp = true;
     driverLauncher = DriverLauncher.getLauncher(deployModeConf);
     driverLauncher.submit(jobAndDriverConf, 500);
     // When the driver is up and the resource is ready, the DriverReady message is delivered.
@@ -201,9 +204,12 @@ public final class JobLauncher {
     // Close everything that's left
     driverRPCServer.shutdown();
     driverLauncher.close();
+    isSetUp = false;
     final Optional<Throwable> possibleError = driverLauncher.getStatus().getError();
     if (possibleError.isPresent()) {
       throw new RuntimeException(possibleError.get());
+    } else if (jobDoneLatch.getCount() > 0) {
+      LOG.info("Job cancelled");
     } else {
       LOG.info("Job successfully completed");
     }
@@ -261,7 +267,7 @@ public final class JobLauncher {
                                final Map<Serializable, Object> broadcastVariables,
                                final String jobId) {
     // launch driver if it hasn't been already
-    if (driverReadyLatch == null) {
+    if (!isSetUp) {
       try {
         setup(new String[]{"-job_id", jobId});
       } catch (Exception e) {
@@ -281,7 +287,9 @@ public final class JobLauncher {
 
     LOG.info("Launching DAG...");
     serializedDAG = Base64.getEncoder().encodeToString(SerializationUtils.serialize(dag));
-    jobDoneLatch = new CountDownLatch(1);
+    if (jobDoneLatch.getCount() == 0) {  // when this is not the first execution.
+      jobDoneLatch = new CountDownLatch(1);
+    }
 
     driverRPCServer.send(ControlMessage.ClientToDriverMessage.newBuilder()
       .setType(ControlMessage.ClientToDriverMessageType.LaunchDAG)
diff --git a/client/src/main/java/org/apache/nemo/client/beam/NemoPipelineResult.java b/client/src/main/java/org/apache/nemo/client/beam/NemoPipelineResult.java
index d983521..d360699 100644
--- a/client/src/main/java/org/apache/nemo/client/beam/NemoPipelineResult.java
+++ b/client/src/main/java/org/apache/nemo/client/beam/NemoPipelineResult.java
@@ -21,20 +21,35 @@ package org.apache.nemo.client.beam;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.nemo.client.ClientEndpoint;
+import org.apache.nemo.client.JobLauncher;
 import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Beam result.
  */
 public final class NemoPipelineResult extends ClientEndpoint implements PipelineResult {
+  private static final Logger LOG = LoggerFactory.getLogger(NemoPipelineResult.class.getName());
+  private final CountDownLatch jobDone;
 
   /**
    * Default constructor.
    */
   public NemoPipelineResult() {
     super(new BeamStateTranslator());
+    this.jobDone = new CountDownLatch(1);
+  }
+
+  /**
+   * Signal that the job is finished to the NemoPipelineResult object.
+   */
+  public void setJobDone() {
+    this.jobDone.countDown();
   }
 
   @Override
@@ -44,14 +59,35 @@ public final class NemoPipelineResult extends ClientEndpoint implements Pipeline
 
   @Override
   public State cancel() throws IOException {
-    throw new UnsupportedOperationException("cancel() in frontend.beam.NemoPipelineResult");
+    try {
+      JobLauncher.shutdown();
+      return State.CANCELLED;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
   public State waitUntilFinish(final Duration duration) {
-    throw new UnsupportedOperationException();
-    // TODO #208: NemoPipelineResult#waitUntilFinish hangs
-    // Previous code that hangs the job:
+    try {
+      if (duration.getMillis() < 1) {
+        this.jobDone.await();
+        return State.DONE;
+      } else {
+        final boolean finished = this.jobDone.await(duration.getMillis(), TimeUnit.MILLISECONDS);
+        if (finished) {
+          LOG.info("Job successfully finished before timeout of {}ms, while waiting until finish",
+            duration.getMillis());
+          return State.DONE;
+        } else {
+          LOG.warn("Job timed out before {}ms, while waiting until finish. Call 'cancel' to cancel the job.",
+            duration.getMillis());
+          return State.RUNNING;
+        }
+      }
+    } catch (final Exception e) {
+      throw new RuntimeException(e);
+    }
     // return (State) super.waitUntilJobFinish(duration.getMillis(), TimeUnit.MILLISECONDS);
   }
 
diff --git a/client/src/main/java/org/apache/nemo/client/beam/NemoRunner.java b/client/src/main/java/org/apache/nemo/client/beam/NemoRunner.java
index 81aa18d..bf3323d 100644
--- a/client/src/main/java/org/apache/nemo/client/beam/NemoRunner.java
+++ b/client/src/main/java/org/apache/nemo/client/beam/NemoRunner.java
@@ -27,6 +27,8 @@ import org.apache.nemo.client.JobLauncher;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
 import org.apache.nemo.compiler.frontend.beam.PipelineVisitor;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * Runner class for BEAM programs.
  */
@@ -84,7 +86,9 @@ public final class NemoRunner extends PipelineRunner<NemoPipelineResult> {
     final PipelineVisitor pipelineVisitor = new PipelineVisitor(pipeline, nemoPipelineOptions);
     pipeline.traverseTopologically(pipelineVisitor);
     final NemoPipelineResult nemoPipelineResult = new NemoPipelineResult();
-    JobLauncher.launchDAG(pipelineVisitor.getConvertedPipeline(), nemoPipelineOptions.getJobName());
+    CompletableFuture.runAsync(() ->
+      JobLauncher.launchDAG(pipelineVisitor.getConvertedPipeline(), nemoPipelineOptions.getJobName()))
+      .thenRun(nemoPipelineResult::setJobDone);
     return nemoPipelineResult;
   }
 }
diff --git a/common/src/main/java/org/apache/nemo/common/exception/OutputMismatchException.java b/common/src/main/java/org/apache/nemo/common/exception/OutputMismatchException.java
new file mode 100644
index 0000000..70e6d5c
--- /dev/null
+++ b/common/src/main/java/org/apache/nemo/common/exception/OutputMismatchException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nemo.common.exception;
+
+/**
+ * OutputMismatchException.
+ * Thrown in ITCases where output doesn't match the expected outputs.
+ */
+public class OutputMismatchException extends RuntimeException {
+  /**
+   * Constructor of OutputMismatchException.
+   *
+   * @param cause cause.
+   */
+  public OutputMismatchException(final Throwable cause) {
+    super(cause);
+  }
+
+  /**
+   * Constructor of OutputMismatchException.
+   *
+   * @param message message.
+   */
+  public OutputMismatchException(final String message) {
+    super(message);
+  }
+}
diff --git a/common/src/main/java/org/apache/nemo/common/test/ExampleTestUtil.java b/common/src/main/java/org/apache/nemo/common/test/ExampleTestUtil.java
index 08691af..6a41449 100644
--- a/common/src/main/java/org/apache/nemo/common/test/ExampleTestUtil.java
+++ b/common/src/main/java/org/apache/nemo/common/test/ExampleTestUtil.java
@@ -18,6 +18,8 @@
  */
 package org.apache.nemo.common.test;
 
+import org.apache.nemo.common.exception.OutputMismatchException;
+
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -63,7 +65,7 @@ public final class ExampleTestUtil {
           try {
             return Files.lines(path);
           } catch (final IOException e) {
-            throw new RuntimeException(e);
+            throw new OutputMismatchException(e);
           }
         })
         .sorted()
@@ -87,7 +89,7 @@ public final class ExampleTestUtil {
           + "\n=============" + testResourceFileName + "=================="
           + resourceOutput
           + "\n===============================";
-      throw new RuntimeException(outputMsg);
+      throw new OutputMismatchException(outputMsg);
     }
   }
 
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
index 5b878d9..d87d546 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
@@ -423,7 +423,7 @@ public final class AlternatingLeastSquare {
       GenericSourceSink.write(result, outputFilePath);
     }
 
-    p.run();
+    p.run().waitUntilFinish();
     LOG.info("JCT " + (System.currentTimeMillis() - start));
   }
 }
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
index 3aa95c4..fd9f994 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
@@ -159,7 +159,7 @@ public final class AlternatingLeastSquareInefficient {
       itemMatrix = itemMatrix.apply(new UpdateUserAndItemMatrix(numFeatures, lambda, rawData, parsedItemData));
     }
 
-    p.run();
+    p.run().waitUntilFinish();
     LOG.info("JCT " + (System.currentTimeMillis() - start));
   }
 }
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java
index ad7f8e2..46d67fc 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java
@@ -70,6 +70,6 @@ public final class Broadcast {
     );
 
     GenericSourceSink.write(result, outputFilePath);
-    p.run();
+    p.run().waitUntilFinish();
   }
 }
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java
index 4ac1085..68e0777 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java
@@ -82,6 +82,6 @@ public final class MinimalWordCount {
       //
       // By default, it will write to a set of files with names like wordcounts-00001-of-00005
       .apply(TextIO.write().to(outputFilePath));
-    p.run();
+    p.run().waitUntilFinish();
   }
 }
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
index 3efbd72..b46ab77 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
@@ -459,7 +459,7 @@ public final class MultinomialLogisticRegression {
       model = model.apply(new UpdateModel(numFeatures, numClasses, i, readInput));
     }
 
-    p.run();
+    p.run().waitUntilFinish();
     LOG.info("JCT " + (System.currentTimeMillis() - start));
   }
 }
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java
index aa56246..c652f46 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java
@@ -121,7 +121,7 @@ public final class NetworkTraceAnalysis {
         }
       }));
     GenericSourceSink.write(result, outputFilePath);
-    p.run();
+    p.run().waitUntilFinish();
   }
 
   /**
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java
index 7845b18..48b2ce9 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java
@@ -95,7 +95,7 @@ public final class PartitionWordsByLength {
     GenericSourceSink.write(longWords, outputFilePath + "_long");
     GenericSourceSink.write(veryLongWords, outputFilePath + "_very_long");
     GenericSourceSink.write(veryVeryLongWords, outputFilePath + "_very_very_long");
-    p.run();
+    p.run().waitUntilFinish();
   }
 
   /**
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java
index 5161bcd..3461b23 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java
@@ -80,7 +80,7 @@ public final class PerKeyMedian {
         }
       }));
     GenericSourceSink.write(result, outputFilePath);
-    p.run();
+    p.run().waitUntilFinish();
 
     LOG.info("*******END*******");
     LOG.info("JCT(ms): " + (System.currentTimeMillis() - start));
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java
index c6fb382..1973970 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java
@@ -93,7 +93,7 @@ public final class PerPercentileAverage {
       GenericSourceSink.write(results[i], outputFilePath + "_" + i);
     }
 
-    p.run();
+    p.run().waitUntilFinish();
   }
 
   /**
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
index 8be20f3..4fd3e80 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
@@ -88,6 +88,6 @@ public final class SimpleSumSQL {
       }
     })), outputFilePath);
 
-    p.run();
+    p.run().waitUntilFinish();
   }
 }
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
index 36ce9d1..890a629 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
@@ -89,6 +89,6 @@ public final class WindowedBroadcast {
       }).withSideInputs(windowedView)
     ).apply(new WriteOneFilePerWindow(outputFilePath, 1));
 
-    p.run();
+    p.run().waitUntilFinish();
   }
 }
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
index 453c4d2..490f300 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
@@ -125,6 +125,6 @@ public final class WindowedWordCount {
       }))
       .apply(new WriteOneFilePerWindow(outputFilePath, 1));
 
-    p.run();
+    p.run().waitUntilFinish();
   }
 }
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java
index dcc5485..367938f 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java
@@ -47,6 +47,19 @@ public final class WordCount {
     final PipelineOptions options = NemoPipelineOptionsFactory.create();
     options.setJobName("WordCount");
 
+    final Pipeline p = generateWordCountPipeline(options, inputFilePath, outputFilePath);
+    p.run().waitUntilFinish();
+  }
+
+  /**
+   * Static method to generate the word count Beam pipeline.
+   * @param options options for the pipeline.
+   * @param inputFilePath the input file path.
+   * @param outputFilePath the output file path.
+   * @return the generated pipeline.
+   */
+  static Pipeline generateWordCountPipeline(final PipelineOptions options,
+                                                   final String inputFilePath, final String outputFilePath) {
     final Pipeline p = Pipeline.create(options);
     final PCollection<String> result = GenericSourceSink.read(p, inputFilePath)
       .apply(MapElements.<String, KV<String, Long>>via(new SimpleFunction<String, KV<String, Long>>() {
@@ -66,6 +79,6 @@ public final class WordCount {
         }
       }));
     GenericSourceSink.write(result, outputFilePath);
-    p.run();
+    return p;
   }
 }
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCountTimeOut1Sec.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCountTimeOut1Sec.java
new file mode 100644
index 0000000..20281f3
--- /dev/null
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCountTimeOut1Sec.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nemo.examples.beam;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.nemo.examples.beam.WordCount.generateWordCountPipeline;
+
+/**
+ * WordCount application, but with a timeout of 1 second.
+ */
+public final class WordCountTimeOut1Sec {
+  private static final Logger LOG = LoggerFactory.getLogger(WordCountTimeOut1Sec.class.getName());
+
+  /**
+   * Private constructor.
+   */
+  private WordCountTimeOut1Sec() {
+  }
+
+  /**
+   * Main function for the MR BEAM program.
+   *
+   * @param args arguments.
+   */
+  public static void main(final String[] args) {
+    final String inputFilePath = args[0];
+    final String outputFilePath = args[1];
+    final PipelineOptions options = NemoPipelineOptionsFactory.create();
+    options.setJobName("WordCountTimeOut1Sec");
+
+    final Pipeline p = generateWordCountPipeline(options, inputFilePath, outputFilePath);
+    final PipelineResult pr = p.run();
+    final PipelineResult.State running = pr.waitUntilFinish(org.joda.time.Duration.standardSeconds(1));
+    try {
+      final PipelineResult.State cancelled = pr.cancel();
+    } catch (final IOException e) {
+      LOG.info("IOException while cancelling job");
+    }
+  }
+}
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/TimeoutITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/TimeoutITCase.java
new file mode 100644
index 0000000..23e024e
--- /dev/null
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/TimeoutITCase.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nemo.examples.beam;
+
+import org.apache.nemo.client.JobLauncher;
+import org.apache.nemo.common.exception.OutputMismatchException;
+import org.apache.nemo.common.test.ArgBuilder;
+import org.apache.nemo.common.test.ExampleTestArgs;
+import org.apache.nemo.common.test.ExampleTestUtil;
+import org.apache.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test WordCount program with JobLauncher, but with a timeout.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+public class TimeoutITCase {
+  private static ArgBuilder builder;
+
+  private static final String inputFileName = "inputs/test_input_wordcount";
+  private static final String outputFileName = "test_output_wordcount";
+  private static final String expectedOutputFileName = "outputs/expected_output_wordcount";
+  private static final String executorResourceFileName = ExampleTestArgs.getFileBasePath() + "executors/beam_test_executor_resources.json";
+  private static final String inputFilePath = ExampleTestArgs.getFileBasePath() + inputFileName;
+  private static final String outputFilePath = ExampleTestArgs.getFileBasePath() + outputFileName;
+
+  @Before
+  public void setUp() throws Exception {
+    builder = new ArgBuilder()
+      .addUserMain(WordCountTimeOut1Sec.class.getCanonicalName())
+      .addUserArgs(inputFilePath, outputFilePath);
+  }
+
+  @Test(timeout = ExampleTestArgs.TIMEOUT, expected = OutputMismatchException.class)
+  public void testTimeout() throws Exception {
+    JobLauncher.main(builder
+      .addResourceJson(executorResourceFileName)
+      .addJobId(TimeoutITCase.class.getSimpleName() + "_wordcount")
+      .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+      .build());
+    try {
+      ExampleTestUtil.ensureOutputValidity(ExampleTestArgs.getFileBasePath(), outputFileName, expectedOutputFileName);
+    } finally {
+      ExampleTestUtil.deleteOutputFile(ExampleTestArgs.getFileBasePath(), outputFileName);
+    }
+  }
+}
diff --git a/runtime/driver/src/main/java/org/apache/nemo/driver/UserApplicationRunner.java b/runtime/driver/src/main/java/org/apache/nemo/driver/UserApplicationRunner.java
index b3ac05b..1bdefb8 100644
--- a/runtime/driver/src/main/java/org/apache/nemo/driver/UserApplicationRunner.java
+++ b/runtime/driver/src/main/java/org/apache/nemo/driver/UserApplicationRunner.java
@@ -28,6 +28,7 @@ import org.apache.nemo.conf.JobConf;
 import org.apache.nemo.runtime.common.metric.JobMetric;
 import org.apache.nemo.runtime.common.plan.PhysicalPlan;
 import org.apache.nemo.runtime.common.plan.PlanRewriter;
+import org.apache.nemo.runtime.common.state.PlanState;
 import org.apache.nemo.runtime.master.PlanStateManager;
 import org.apache.nemo.runtime.master.RuntimeMaster;
 import org.apache.nemo.runtime.master.metric.MetricStore;
@@ -100,15 +101,16 @@ public final class UserApplicationRunner {
       // Wait for the job to finish and stop logging
       final PlanStateManager planStateManager = executionResult.left();
       final ScheduledExecutorService dagLoggingExecutor = executionResult.right();
+      final PlanState.State state;
       try {
-        planStateManager.waitUntilFinish();
+        state = planStateManager.waitUntilFinish();
         dagLoggingExecutor.shutdown();
       } finally {
         planStateManager.storeJSON("final");
       }
 
       final long endTime = System.currentTimeMillis();
-      LOG.info("{} is complete!", physicalPlan.getPlanId());
+      LOG.info("{} is complete, with final status {}!", physicalPlan.getPlanId(), state);
       MetricStore.getStore().getOrCreateMetric(JobMetric.class, physicalPlan.getPlanId())
         .setJobDuration(endTime - startTime);
     } catch (final Exception e) {
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java
index 5841adb..53cab57 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java
@@ -458,18 +458,7 @@ public final class PlanStateManager {
    * @return the final state of this plan.
    */
   public PlanState.State waitUntilFinish() {
-    finishLock.lock();
-    try {
-      while (!isPlanDone()) {
-        planFinishedCondition.await();
-      }
-    } catch (final InterruptedException e) {
-      LOG.warn("Interrupted during waiting the finish of Plan ID {}", planId);
-      Thread.currentThread().interrupt();
-    } finally {
-      finishLock.unlock();
-    }
-    return getPlanState();
+    return waitUntilFinish(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
   }
 
   /**
@@ -490,7 +479,7 @@ public final class PlanStateManager {
         }
       }
     } catch (final InterruptedException e) {
-      LOG.warn("Interrupted during waiting the finish of Plan ID {}", planId);
+      LOG.warn("Interrupted while waiting for the finish of Plan ID {}", planId);
       Thread.currentThread().interrupt();
     } finally {
       finishLock.unlock();