You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/29 03:00:38 UTC

[2/2] incubator-beam git commit: [BEAM-443] Add waitToFinish() and cancel() in PipelineResult.

[BEAM-443] Add waitToFinish() and cancel() in PipelineResult.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1b4b7c76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1b4b7c76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1b4b7c76

Branch: refs/heads/master
Commit: 1b4b7c762825bba90b80b4548bc442ae9c813dca
Parents: 1df6f5f
Author: Pei He <pe...@google.com>
Authored: Mon Jul 11 19:42:02 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jul 28 20:00:27 2016 -0700

----------------------------------------------------------------------
 .../beam/examples/common/ExampleUtils.java      |  4 +-
 .../beam/runners/direct/DirectRunner.java       | 18 +++++
 .../beam/runners/flink/FlinkRunnerResult.java   | 22 ++++-
 .../dataflow/BlockingDataflowRunner.java        |  5 +-
 .../runners/dataflow/DataflowPipelineJob.java   | 85 +++++++++++---------
 .../beam/runners/dataflow/DataflowRunner.java   |  5 +-
 .../dataflow/testing/TestDataflowRunner.java    |  7 +-
 .../dataflow/BlockingDataflowRunnerTest.java    |  9 +--
 .../dataflow/DataflowPipelineJobTest.java       | 43 ++++++----
 .../testing/TestDataflowRunnerTest.java         | 34 ++++----
 .../spark/translation/EvaluationContext.java    | 21 +++++
 .../org/apache/beam/sdk/PipelineResult.java     | 38 +++++++++
 .../main/java/common/DataflowExampleUtils.java  |  5 +-
 13 files changed, 202 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
index 8f9be31..8b66861 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
@@ -306,12 +306,12 @@ public class ExampleUtils {
         addShutdownHook(jobsToCancel);
       }
       try {
-        job.waitToFinish(-1, TimeUnit.SECONDS, new MonitoringUtil.LoggingHandler());
+        job.waitUntilFinish();
       } catch (Exception e) {
         throw new RuntimeException("Failed to wait for job to finish: " + job.getJobId());
       }
     } else {
-      // Do nothing if the given PipelineResult doesn't support waitToFinish(),
+      // Do nothing if the given PipelineResult doesn't support waitUntilFinish(),
       // such as EvaluationResults returned by DirectRunner.
       tearDown();
       printPendingMessages();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 72194da..743c565 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -51,8 +51,10 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -362,5 +364,21 @@ public class DirectRunner
       }
       return state;
     }
+
+    @Override
+    public State cancel() throws IOException {
+      throw new UnsupportedOperationException("DirectPipelineResult does not support cancel.");
+    }
+
+    @Override
+    public State waitUntilFinish() throws IOException {
+      return waitUntilFinish(Duration.millis(-1));
+    }
+
+    @Override
+    public State waitUntilFinish(Duration duration) throws IOException {
+      throw new UnsupportedOperationException(
+          "DirectPipelineResult does not support waitUntilFinish.");
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
index a8f4cac..cae0b2a 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -22,6 +22,9 @@ import org.apache.beam.sdk.runners.AggregatorRetrievalException;
 import org.apache.beam.sdk.runners.AggregatorValues;
 import org.apache.beam.sdk.transforms.Aggregator;
 
+import org.joda.time.Duration;
+
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 
@@ -31,16 +34,12 @@ import java.util.Map;
  * {@link org.apache.beam.sdk.transforms.Aggregator}s.
  */
 public class FlinkRunnerResult implements PipelineResult {
-  
   private final Map<String, Object> aggregators;
-  
   private final long runtime;
-  
   public FlinkRunnerResult(Map<String, Object> aggregators, long runtime) {
     this.aggregators = (aggregators == null || aggregators.isEmpty()) ?
         Collections.<String, Object>emptyMap() :
         Collections.unmodifiableMap(aggregators);
-    
     this.runtime = runtime;
   }
 
@@ -73,4 +72,19 @@ public class FlinkRunnerResult implements PipelineResult {
         ", runtime=" + runtime +
         '}';
   }
+
+  @Override
+  public State cancel() throws IOException {
+    throw new UnsupportedOperationException("FlinkRunnerResult does not support cancel.");
+  }
+
+  @Override
+  public State waitUntilFinish() {
+    return waitUntilFinish(Duration.millis(-1));
+  }
+
+  @Override
+  public State waitUntilFinish(Duration duration) {
+    throw new UnsupportedOperationException("FlinkRunnerResult does not support waitUntilFinish.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
index f7f7dc8..e7cd67e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
@@ -29,11 +29,11 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 
+import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nullable;
 
@@ -116,8 +116,7 @@ public class BlockingDataflowRunner extends
       @Nullable
       State result;
       try {
-        result = job.waitToFinish(
-            BUILTIN_JOB_TIMEOUT_SEC, TimeUnit.SECONDS, new MonitoringUtil.LoggingHandler());
+        result = job.waitUntilFinish(Duration.standardSeconds(BUILTIN_JOB_TIMEOUT_SEC));
       } catch (IOException | InterruptedException ex) {
         if (ex instanceof InterruptedException) {
           Thread.currentThread().interrupt();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 1b3dd43..a02d280 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -21,6 +21,7 @@ import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudTime;
 
 import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
 import org.apache.beam.runners.dataflow.internal.DataflowMetricUpdateExtractor;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.runners.AggregatorRetrievalException;
@@ -35,13 +36,13 @@ import com.google.api.client.util.BackOff;
 import com.google.api.client.util.BackOffUtils;
 import com.google.api.client.util.NanoClock;
 import com.google.api.client.util.Sleeper;
-import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.JobMessage;
 import com.google.api.services.dataflow.model.JobMetrics;
 import com.google.api.services.dataflow.model.MetricUpdate;
 import com.google.common.annotations.VisibleForTesting;
 
+import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,7 +75,7 @@ public class DataflowPipelineJob implements PipelineResult {
    * Client for the Dataflow service. This can be used to query the service
    * for information about the job.
    */
-  private Dataflow dataflowClient;
+  private DataflowPipelineOptions dataflowOptions;
 
   /**
    * The state the job terminated in or {@code null} if the job has not terminated.
@@ -112,13 +113,17 @@ public class DataflowPipelineJob implements PipelineResult {
    *
    * @param projectId the project id
    * @param jobId the job id
-   * @param dataflowClient the client for the Dataflow Service
+   * @param dataflowOptions the client for the Dataflow Service
+   * @param aggregatorTransforms a mapping from aggregators to PTransforms
    */
-  public DataflowPipelineJob(String projectId, String jobId, Dataflow dataflowClient,
+  public DataflowPipelineJob(
+      String projectId,
+      String jobId,
+      DataflowPipelineOptions dataflowOptions,
       DataflowAggregatorTransforms aggregatorTransforms) {
     this.projectId = projectId;
     this.jobId = jobId;
-    this.dataflowClient = dataflowClient;
+    this.dataflowOptions = dataflowOptions;
     this.aggregatorTransforms = aggregatorTransforms;
   }
 
@@ -152,12 +157,25 @@ public class DataflowPipelineJob implements PipelineResult {
     return replacedByJob;
   }
 
+  @Override
+  @Nullable
+  public State waitUntilFinish() throws IOException, InterruptedException {
+    return waitUntilFinish(Duration.millis(-1));
+  }
+
+  @Override
+  @Nullable
+  public State waitUntilFinish(Duration duration)
+          throws IOException, InterruptedException {
+    return waitUntilFinish(duration, new MonitoringUtil.LoggingHandler());
+  }
+
   /**
-   * Waits for the job to finish and return the final status.
+   * Waits until the pipeline finishes and returns the final status.
    *
-   * @param timeToWait The time to wait in units timeUnit for the job to finish.
+   * @param duration The time to wait for the job to finish.
    *     Provide a value less than 1 ms for an infinite wait.
-   * @param timeUnit The unit of time for timeToWait.
+   *
    * @param messageHandler If non null this handler will be invoked for each
    *   batch of messages received.
    * @return The final state of the job or null on timeout or if the
@@ -167,48 +185,45 @@ public class DataflowPipelineJob implements PipelineResult {
    * @throws InterruptedException
    */
   @Nullable
-  public State waitToFinish(
-      long timeToWait,
-      TimeUnit timeUnit,
-      MonitoringUtil.JobMessagesHandler messageHandler)
-          throws IOException, InterruptedException {
-    return waitToFinish(timeToWait, timeUnit, messageHandler, Sleeper.DEFAULT, NanoClock.SYSTEM);
+  @VisibleForTesting
+  public State waitUntilFinish(
+      Duration duration,
+      MonitoringUtil.JobMessagesHandler messageHandler) throws IOException, InterruptedException {
+    return waitUntilFinish(duration, messageHandler, Sleeper.DEFAULT, NanoClock.SYSTEM);
   }
 
   /**
-   * Wait for the job to finish and return the final status.
+   * Waits until the pipeline finishes and returns the final status.
    *
-   * @param timeToWait The time to wait in units timeUnit for the job to finish.
+   * @param duration The time to wait for the job to finish.
    *     Provide a value less than 1 ms for an infinite wait.
-   * @param timeUnit The unit of time for timeToWait.
+   *
    * @param messageHandler If non null this handler will be invoked for each
    *   batch of messages received.
    * @param sleeper A sleeper to use to sleep between attempts.
    * @param nanoClock A nanoClock used to time the total time taken.
-   * @return The final state of the job or null on timeout or if the
-   *   thread is interrupted.
+   * @return The final state of the job or null on timeout.
    * @throws IOException If there is a persistent problem getting job
    *   information.
-   * @throws InterruptedException
+   * @throws InterruptedException if the thread is interrupted.
    */
   @Nullable
   @VisibleForTesting
-  State waitToFinish(
-      long timeToWait,
-      TimeUnit timeUnit,
+  State waitUntilFinish(
+      Duration duration,
       MonitoringUtil.JobMessagesHandler messageHandler,
       Sleeper sleeper,
       NanoClock nanoClock)
           throws IOException, InterruptedException {
-    MonitoringUtil monitor = new MonitoringUtil(projectId, dataflowClient);
+    MonitoringUtil monitor = new MonitoringUtil(projectId, dataflowOptions.getDataflowClient());
 
     long lastTimestamp = 0;
     BackOff backoff =
-        timeUnit.toMillis(timeToWait) > 0
+        duration.getMillis() > 0
             ? new AttemptAndTimeBoundedExponentialBackOff(
                 MESSAGES_POLLING_ATTEMPTS,
                 MESSAGES_POLLING_INTERVAL,
-                timeUnit.toMillis(timeToWait),
+                duration.getMillis(),
                 AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS,
                 nanoClock)
             : new AttemptBoundedExponentialBackOff(
@@ -250,18 +265,16 @@ public class DataflowPipelineJob implements PipelineResult {
     return null;  // Timed out.
   }
 
-  /**
-   * Cancels the job.
-   * @throws IOException if there is a problem executing the cancel request.
-   */
-  public void cancel() throws IOException {
+  @Override
+  public State cancel() throws IOException {
     Job content = new Job();
     content.setProjectId(projectId);
     content.setId(jobId);
     content.setRequestedState("JOB_STATE_CANCELLED");
-    dataflowClient.projects().jobs()
+    dataflowOptions.getDataflowClient().projects().jobs()
         .update(projectId, jobId, content)
         .execute();
+    return State.CANCELLED;
   }
 
   @Override
@@ -315,7 +328,7 @@ public class DataflowPipelineJob implements PipelineResult {
     // Retry loop ends in return or throw
     while (true) {
       try {
-        Job job = dataflowClient
+        Job job = dataflowOptions.getDataflowClient()
             .projects()
             .jobs()
             .get(projectId, jobId)
@@ -324,7 +337,7 @@ public class DataflowPipelineJob implements PipelineResult {
         if (currentState.isTerminal()) {
           terminalState = currentState;
           replacedByJob = new DataflowPipelineJob(
-              getProjectId(), job.getReplacedByJobId(), dataflowClient, aggregatorTransforms);
+              getProjectId(), job.getReplacedByJobId(), dataflowOptions, aggregatorTransforms);
         }
         return job;
       } catch (IOException exn) {
@@ -371,8 +384,8 @@ public class DataflowPipelineJob implements PipelineResult {
         metricUpdates = terminalMetricUpdates;
       } else {
         boolean terminal = getState().isTerminal();
-        JobMetrics jobMetrics =
-            dataflowClient.projects().jobs().getMetrics(projectId, jobId).execute();
+        JobMetrics jobMetrics = dataflowOptions.getDataflowClient()
+            .projects().jobs().getMetrics(projectId, jobId).execute();
         metricUpdates = jobMetrics.getMetrics();
         if (terminal && jobMetrics.getMetrics() != null) {
           terminalMetricUpdates = metricUpdates;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 8f9e76e..7191fe8 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -606,9 +606,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
     // Use a raw client for post-launch monitoring, as status calls may fail
     // regularly and need not be retried automatically.
-    DataflowPipelineJob dataflowPipelineJob =
-        new DataflowPipelineJob(options.getProject(), jobResult.getId(),
-            options.getDataflowClient(), aggregatorTransforms);
+    DataflowPipelineJob dataflowPipelineJob = new DataflowPipelineJob(
+        options.getProject(), jobResult.getId(), options, aggregatorTransforms);
 
     // If the service returned client request id, the SDK needs to compare it
     // with the original id generated in the request, if they are not the same

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
index b60e1be..f74f4dd 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
@@ -41,6 +41,8 @@ import com.google.api.services.dataflow.model.MetricUpdate;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
+
+import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +52,6 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
 /**
  * {@link TestDataflowRunner} is a pipeline runner that wraps a
@@ -131,7 +132,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
             }
           }
         });
-        State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, messageHandler);
+        State finalState = job.waitUntilFinish(Duration.standardMinutes(10L), messageHandler);
         if (finalState == null || finalState == State.RUNNING) {
           LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.",
               job.getJobId());
@@ -139,7 +140,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
         }
         result = resultFuture.get();
       } else {
-        job.waitToFinish(-1, TimeUnit.SECONDS, messageHandler);
+        job.waitUntilFinish(Duration.standardSeconds(-1), messageHandler);
         result = checkForSuccess(job);
       }
       if (!result.isPresent()) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java
index 7be074e..7bdac3d 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java
@@ -19,14 +19,13 @@ package org.apache.beam.runners.dataflow;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -40,14 +39,13 @@ import org.hamcrest.Description;
 import org.hamcrest.Factory;
 import org.hamcrest.Matcher;
 import org.hamcrest.TypeSafeMatcher;
+import org.joda.time.Duration;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-import java.util.concurrent.TimeUnit;
-
 /**
  * Tests for BlockingDataflowRunner.
  */
@@ -181,8 +179,7 @@ public class BlockingDataflowRunnerTest {
     DataflowPipelineJob mockJob = mock(DataflowPipelineJob.class);
     when(mockJob.getProjectId()).thenReturn(projectId);
     when(mockJob.getJobId()).thenReturn(jobId);
-    when(mockJob.waitToFinish(
-        anyLong(), isA(TimeUnit.class), isA(MonitoringUtil.JobMessagesHandler.class)))
+    when(mockJob.waitUntilFinish(any(Duration.class)))
         .thenReturn(terminalState);
     return mockJob;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 80b7e7b..343d538 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -33,8 +33,10 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
+import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
 import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.AggregatorRetrievalException;
 import org.apache.beam.sdk.runners.AggregatorValues;
 import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
@@ -59,6 +61,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSetMultimap;
 
+import org.joda.time.Duration;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -93,12 +96,17 @@ public class DataflowPipelineJobTest {
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
+  private TestDataflowPipelineOptions options;
+
   @Before
   public void setup() {
     MockitoAnnotations.initMocks(this);
 
     when(mockWorkflowClient.projects()).thenReturn(mockProjects);
     when(mockProjects.jobs()).thenReturn(mockJobs);
+
+    options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
+    options.setDataflowClient(mockWorkflowClient);
   }
 
   /**
@@ -146,9 +154,10 @@ public class DataflowPipelineJobTest {
         mock(DataflowAggregatorTransforms.class);
 
     DataflowPipelineJob job = new DataflowPipelineJob(
-        PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms);
+        PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
 
-    State state = job.waitToFinish(5, TimeUnit.MINUTES, jobHandler, fastClock, fastClock);
+    State state = job.waitUntilFinish(
+        Duration.standardMinutes(5), jobHandler, fastClock, fastClock);
     assertEquals(null, state);
   }
 
@@ -164,9 +173,9 @@ public class DataflowPipelineJobTest {
         mock(DataflowAggregatorTransforms.class);
 
     DataflowPipelineJob job = new DataflowPipelineJob(
-        PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms);
+        PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
 
-    return job.waitToFinish(1, TimeUnit.MINUTES, null, fastClock, fastClock);
+    return job.waitUntilFinish(Duration.standardMinutes(1), null, fastClock, fastClock);
   }
 
   /**
@@ -215,10 +224,10 @@ public class DataflowPipelineJobTest {
         mock(DataflowAggregatorTransforms.class);
 
     DataflowPipelineJob job = new DataflowPipelineJob(
-        PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms);
+        PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
 
     long startTime = fastClock.nanoTime();
-    State state = job.waitToFinish(5, TimeUnit.MINUTES, null, fastClock, fastClock);
+    State state = job.waitUntilFinish(Duration.standardMinutes(5), null, fastClock, fastClock);
     assertEquals(null, state);
     long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);
     checkValidInterval(DataflowPipelineJob.MESSAGES_POLLING_INTERVAL,
@@ -235,9 +244,9 @@ public class DataflowPipelineJobTest {
         mock(DataflowAggregatorTransforms.class);
 
     DataflowPipelineJob job = new DataflowPipelineJob(
-        PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms);
+        PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
     long startTime = fastClock.nanoTime();
-    State state = job.waitToFinish(4, TimeUnit.MILLISECONDS, null, fastClock, fastClock);
+    State state = job.waitUntilFinish(Duration.millis(4), null, fastClock, fastClock);
     assertEquals(null, state);
     long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);
     // Should only sleep for the 4 ms remaining.
@@ -258,7 +267,7 @@ public class DataflowPipelineJobTest {
         mock(DataflowAggregatorTransforms.class);
 
     DataflowPipelineJob job = new DataflowPipelineJob(
-        PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms);
+        PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
 
     assertEquals(
         State.RUNNING,
@@ -275,7 +284,7 @@ public class DataflowPipelineJobTest {
         mock(DataflowAggregatorTransforms.class);
 
     DataflowPipelineJob job = new DataflowPipelineJob(
-        PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms);
+        PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
 
     long startTime = fastClock.nanoTime();
     assertEquals(
@@ -314,7 +323,7 @@ public class DataflowPipelineJobTest {
     modelJob.setCurrentState(State.RUNNING.toString());
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms);
+        new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
 
     AggregatorValues<?> values = job.getAggregatorValues(aggregator);
 
@@ -349,7 +358,7 @@ public class DataflowPipelineJobTest {
     modelJob.setCurrentState(State.RUNNING.toString());
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms);
+        new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
 
     AggregatorValues<?> values = job.getAggregatorValues(aggregator);
 
@@ -395,7 +404,7 @@ public class DataflowPipelineJobTest {
     modelJob.setCurrentState(State.RUNNING.toString());
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms);
+        new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
 
     AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
 
@@ -463,7 +472,7 @@ public class DataflowPipelineJobTest {
     modelJob.setCurrentState(State.RUNNING.toString());
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms);
+        new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
 
     AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
 
@@ -512,7 +521,7 @@ public class DataflowPipelineJobTest {
     modelJob.setCurrentState(State.RUNNING.toString());
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms);
+        new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
 
     AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
 
@@ -530,7 +539,7 @@ public class DataflowPipelineJobTest {
         ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms);
+        new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("not used in this pipeline");
@@ -566,7 +575,7 @@ public class DataflowPipelineJobTest {
     modelJob.setCurrentState(State.RUNNING.toString());
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms);
+        new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
 
     thrown.expect(AggregatorRetrievalException.class);
     thrown.expectCause(is(cause));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
index 221cd0d..b4bbd39 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -67,6 +67,7 @@ import com.google.common.collect.Lists;
 
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Rule;
@@ -83,7 +84,6 @@ import org.mockito.stubbing.Answer;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
 
 /** Tests for {@link TestDataflowRunner}. */
 @RunWith(JUnit4.class)
@@ -175,7 +175,7 @@ public class TestDataflowRunnerTest {
     when(mockJob.getState()).thenReturn(State.RUNNING);
     when(mockJob.getProjectId()).thenReturn("test-project");
     when(mockJob.getJobId()).thenReturn("test-job");
-    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
         .thenAnswer(new Answer<State>() {
           @Override
           public State answer(InvocationOnMock invocation) {
@@ -183,7 +183,7 @@ public class TestDataflowRunnerTest {
             message.setMessageText("FooException");
             message.setTime(TimeUtil.toCloudTime(Instant.now()));
             message.setMessageImportance("JOB_MESSAGE_ERROR");
-            ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
+            ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[1])
                 .process(Arrays.asList(message));
             return State.CANCELLED;
           }
@@ -259,7 +259,7 @@ public class TestDataflowRunnerTest {
   @Test
   public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
     DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -274,7 +274,7 @@ public class TestDataflowRunnerTest {
   @Test
   public void testCheckingForSuccessWhenPAssertFails() throws Exception {
     DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -289,7 +289,7 @@ public class TestDataflowRunnerTest {
   @Test
   public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
     DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -325,7 +325,7 @@ public class TestDataflowRunnerTest {
   @Test
   public void testStreamingPipelineFailsIfServiceFails() throws Exception {
     DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -348,7 +348,7 @@ public class TestDataflowRunnerTest {
     when(mockJob.getState()).thenReturn(State.RUNNING);
     when(mockJob.getProjectId()).thenReturn("test-project");
     when(mockJob.getJobId()).thenReturn("test-job");
-    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
         .thenAnswer(new Answer<State>() {
           @Override
           public State answer(InvocationOnMock invocation) {
@@ -356,7 +356,7 @@ public class TestDataflowRunnerTest {
             message.setMessageText("FooException");
             message.setTime(TimeUtil.toCloudTime(Instant.now()));
             message.setMessageImportance("JOB_MESSAGE_ERROR");
-            ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
+            ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[1])
                 .process(Arrays.asList(message));
             return State.CANCELLED;
           }
@@ -422,7 +422,7 @@ public class TestDataflowRunnerTest {
     p.getOptions().as(TestPipelineOptions.class)
         .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
 
-    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
         .thenReturn(State.DONE);
 
     when(request.execute()).thenReturn(
@@ -472,7 +472,7 @@ public class TestDataflowRunnerTest {
     p.getOptions().as(TestPipelineOptions.class)
         .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
 
-    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
         .thenReturn(State.DONE);
 
     when(request.execute()).thenReturn(
@@ -503,8 +503,8 @@ public class TestDataflowRunnerTest {
     try {
       runner.run(p, mockRunner);
     } catch (AssertionError expected) {
-      verify(mockJob, Mockito.times(1)).waitToFinish(any(Long.class), any(TimeUnit.class),
-          any(JobMessagesHandler.class));
+      verify(mockJob, Mockito.times(1)).waitUntilFinish(
+          any(Duration.class), any(JobMessagesHandler.class));
       return;
     }
     fail("Expected an exception on pipeline failure.");
@@ -529,7 +529,7 @@ public class TestDataflowRunnerTest {
     p.getOptions().as(TestPipelineOptions.class)
         .setOnSuccessMatcher(new TestFailureMatcher());
 
-    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
         .thenReturn(State.FAILED);
 
     when(request.execute()).thenReturn(
@@ -537,7 +537,7 @@ public class TestDataflowRunnerTest {
     try {
       runner.run(p, mockRunner);
     } catch (AssertionError expected) {
-      verify(mockJob, Mockito.times(1)).waitToFinish(any(Long.class), any(TimeUnit.class),
+      verify(mockJob, Mockito.times(1)).waitUntilFinish(any(Duration.class),
           any(JobMessagesHandler.class));
       return;
     }
@@ -560,8 +560,8 @@ public class TestDataflowRunnerTest {
         fail(String.format("Expected PipelineResult but received %s", o));
       }
       try {
-        verify(mockJob, Mockito.times(called)).waitToFinish(any(Long.class), any(TimeUnit.class),
-            any(JobMessagesHandler.class));
+        verify(mockJob, Mockito.times(called)).waitUntilFinish(
+            any(Duration.class), any(JobMessagesHandler.class));
       } catch (IOException | InterruptedException e) {
         throw new AssertionError(e);
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index d737f5e..169c2af 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -44,7 +44,9 @@ import com.google.common.collect.Iterables;
 
 import org.apache.spark.api.java.JavaRDDLike;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.joda.time.Duration;
 
+import java.io.IOException;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -286,4 +288,23 @@ public class EvaluationContext implements EvaluationResult {
   public State getState() {
     return State.DONE;
   }
+
+  @Override
+  public State cancel() throws IOException {
+    throw new UnsupportedOperationException(
+        "Spark runner EvaluationContext does not support cancel.");
+  }
+
+  @Override
+  public State waitUntilFinish()
+      throws IOException, InterruptedException {
+    return waitUntilFinish(Duration.millis(-1));
+  }
+
+  @Override
+  public State waitUntilFinish(Duration duration)
+      throws IOException, InterruptedException {
+    throw new UnsupportedOperationException(
+        "Spark runner EvaluationContext does not support waitUntilFinish.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
index f67cb47..993962c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
@@ -21,6 +21,10 @@ import org.apache.beam.sdk.runners.AggregatorRetrievalException;
 import org.apache.beam.sdk.runners.AggregatorValues;
 import org.apache.beam.sdk.transforms.Aggregator;
 
+import org.joda.time.Duration;
+
+import java.io.IOException;
+
 /**
  * Result of {@link Pipeline#run()}.
  */
@@ -34,6 +38,40 @@ public interface PipelineResult {
   State getState();
 
   /**
+   * Cancels the pipeline execution.
+   *
+   * @throws IOException if there is a problem executing the cancel request.
+   * @throws UnsupportedOperationException if the runner does not support cancellation.
+   */
+  State cancel() throws IOException;
+
+  /**
+   * Waits until the pipeline finishes and returns the final status.
+   * It times out after the given duration.
+   *
+   * @param duration The time to wait for the pipeline to finish.
+   *     Provide a value less than 1 ms for an infinite wait.
+   *
+   * @return The final state of the pipeline or null on timeout.
+   * @throws IOException If there is a persistent problem getting job
+   *   information.
+   * @throws InterruptedException if the thread is interrupted.
+   * @throws UnsupportedOperationException if the runner does not support cancellation.
+   */
+  State waitUntilFinish(Duration duration) throws IOException, InterruptedException;
+
+  /**
+   * Waits until the pipeline finishes and returns the final status.
+   *
+   * @return The final state of the pipeline.
+   * @throws IOException If there is a persistent problem getting job
+   *   information.
+   * @throws InterruptedException if the thread is interrupted.
+   * @throws UnsupportedOperationException if the runner does not support cancellation.
+   */
+  State waitUntilFinish() throws IOException, InterruptedException;
+
+  /**
    * Retrieves the current value of the provided {@link Aggregator}.
    *
    * @param aggregator the {@link Aggregator} to retrieve values for.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
index 8eb95c0..fa29fdd 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
@@ -49,7 +49,6 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
 import javax.servlet.http.HttpServletResponse;
 
@@ -309,12 +308,12 @@ public class DataflowExampleUtils {
         addShutdownHook(jobsToCancel);
       }
       try {
-        job.waitToFinish(-1, TimeUnit.SECONDS, new MonitoringUtil.LoggingHandler());
+        job.waitUntilFinish();
       } catch (Exception e) {
         throw new RuntimeException("Failed to wait for job to finish: " + job.getJobId());
       }
     } else {
-      // Do nothing if the given PipelineResult doesn't support waitToFinish(),
+      // Do nothing if the given PipelineResult doesn't support waitUntilFinish(),
       // such as EvaluationResults returned by DirectRunner.
     }
   }