You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/29 03:00:37 UTC
[1/2] incubator-beam git commit: Closes #642
Repository: incubator-beam
Updated Branches:
refs/heads/master 1df6f5f97 -> 119da4a82
Closes #642
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/119da4a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/119da4a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/119da4a8
Branch: refs/heads/master
Commit: 119da4a82611662486f57536168443277c926790
Parents: 1df6f5f 1b4b7c7
Author: Dan Halperin <dh...@google.com>
Authored: Thu Jul 28 20:00:27 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jul 28 20:00:27 2016 -0700
----------------------------------------------------------------------
.../beam/examples/common/ExampleUtils.java | 4 +-
.../beam/runners/direct/DirectRunner.java | 18 +++++
.../beam/runners/flink/FlinkRunnerResult.java | 22 ++++-
.../dataflow/BlockingDataflowRunner.java | 5 +-
.../runners/dataflow/DataflowPipelineJob.java | 85 +++++++++++---------
.../beam/runners/dataflow/DataflowRunner.java | 5 +-
.../dataflow/testing/TestDataflowRunner.java | 7 +-
.../dataflow/BlockingDataflowRunnerTest.java | 9 +--
.../dataflow/DataflowPipelineJobTest.java | 43 ++++++----
.../testing/TestDataflowRunnerTest.java | 34 ++++----
.../spark/translation/EvaluationContext.java | 21 +++++
.../org/apache/beam/sdk/PipelineResult.java | 38 +++++++++
.../main/java/common/DataflowExampleUtils.java | 5 +-
13 files changed, 202 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
[2/2] incubator-beam git commit: [BEAM-443] Add waitToFinish() and
cancel() in PipelineResult.
Posted by dh...@apache.org.
[BEAM-443] Add waitToFinish() and cancel() in PipelineResult.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1b4b7c76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1b4b7c76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1b4b7c76
Branch: refs/heads/master
Commit: 1b4b7c762825bba90b80b4548bc442ae9c813dca
Parents: 1df6f5f
Author: Pei He <pe...@google.com>
Authored: Mon Jul 11 19:42:02 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jul 28 20:00:27 2016 -0700
----------------------------------------------------------------------
.../beam/examples/common/ExampleUtils.java | 4 +-
.../beam/runners/direct/DirectRunner.java | 18 +++++
.../beam/runners/flink/FlinkRunnerResult.java | 22 ++++-
.../dataflow/BlockingDataflowRunner.java | 5 +-
.../runners/dataflow/DataflowPipelineJob.java | 85 +++++++++++---------
.../beam/runners/dataflow/DataflowRunner.java | 5 +-
.../dataflow/testing/TestDataflowRunner.java | 7 +-
.../dataflow/BlockingDataflowRunnerTest.java | 9 +--
.../dataflow/DataflowPipelineJobTest.java | 43 ++++++----
.../testing/TestDataflowRunnerTest.java | 34 ++++----
.../spark/translation/EvaluationContext.java | 21 +++++
.../org/apache/beam/sdk/PipelineResult.java | 38 +++++++++
.../main/java/common/DataflowExampleUtils.java | 5 +-
13 files changed, 202 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
index 8f9be31..8b66861 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
@@ -306,12 +306,12 @@ public class ExampleUtils {
addShutdownHook(jobsToCancel);
}
try {
- job.waitToFinish(-1, TimeUnit.SECONDS, new MonitoringUtil.LoggingHandler());
+ job.waitUntilFinish();
} catch (Exception e) {
throw new RuntimeException("Failed to wait for job to finish: " + job.getJobId());
}
} else {
- // Do nothing if the given PipelineResult doesn't support waitToFinish(),
+ // Do nothing if the given PipelineResult doesn't support waitUntilFinish(),
// such as EvaluationResults returned by DirectRunner.
tearDown();
printPendingMessages();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 72194da..743c565 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -51,8 +51,10 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import org.joda.time.Duration;
import org.joda.time.Instant;
+import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -362,5 +364,21 @@ public class DirectRunner
}
return state;
}
+
+ @Override
+ public State cancel() throws IOException {
+ throw new UnsupportedOperationException("DirectPipelineResult does not support cancel.");
+ }
+
+ @Override
+ public State waitUntilFinish() throws IOException {
+ return waitUntilFinish(Duration.millis(-1));
+ }
+
+ @Override
+ public State waitUntilFinish(Duration duration) throws IOException {
+ throw new UnsupportedOperationException(
+ "DirectPipelineResult does not support waitUntilFinish.");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
index a8f4cac..cae0b2a 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -22,6 +22,9 @@ import org.apache.beam.sdk.runners.AggregatorRetrievalException;
import org.apache.beam.sdk.runners.AggregatorValues;
import org.apache.beam.sdk.transforms.Aggregator;
+import org.joda.time.Duration;
+
+import java.io.IOException;
import java.util.Collections;
import java.util.Map;
@@ -31,16 +34,12 @@ import java.util.Map;
* {@link org.apache.beam.sdk.transforms.Aggregator}s.
*/
public class FlinkRunnerResult implements PipelineResult {
-
private final Map<String, Object> aggregators;
-
private final long runtime;
-
public FlinkRunnerResult(Map<String, Object> aggregators, long runtime) {
this.aggregators = (aggregators == null || aggregators.isEmpty()) ?
Collections.<String, Object>emptyMap() :
Collections.unmodifiableMap(aggregators);
-
this.runtime = runtime;
}
@@ -73,4 +72,19 @@ public class FlinkRunnerResult implements PipelineResult {
", runtime=" + runtime +
'}';
}
+
+ @Override
+ public State cancel() throws IOException {
+ throw new UnsupportedOperationException("FlinkRunnerResult does not support cancel.");
+ }
+
+ @Override
+ public State waitUntilFinish() {
+ return waitUntilFinish(Duration.millis(-1));
+ }
+
+ @Override
+ public State waitUntilFinish(Duration duration) {
+ throw new UnsupportedOperationException("FlinkRunnerResult does not support waitUntilFinish.");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
index f7f7dc8..e7cd67e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
@@ -29,11 +29,11 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
+import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
@@ -116,8 +116,7 @@ public class BlockingDataflowRunner extends
@Nullable
State result;
try {
- result = job.waitToFinish(
- BUILTIN_JOB_TIMEOUT_SEC, TimeUnit.SECONDS, new MonitoringUtil.LoggingHandler());
+ result = job.waitUntilFinish(Duration.standardSeconds(BUILTIN_JOB_TIMEOUT_SEC));
} catch (IOException | InterruptedException ex) {
if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 1b3dd43..a02d280 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -21,6 +21,7 @@ import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudTime;
import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
import org.apache.beam.runners.dataflow.internal.DataflowMetricUpdateExtractor;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.runners.AggregatorRetrievalException;
@@ -35,13 +36,13 @@ import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.NanoClock;
import com.google.api.client.util.Sleeper;
-import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.JobMessage;
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricUpdate;
import com.google.common.annotations.VisibleForTesting;
+import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,7 +75,7 @@ public class DataflowPipelineJob implements PipelineResult {
* Client for the Dataflow service. This can be used to query the service
* for information about the job.
*/
- private Dataflow dataflowClient;
+ private DataflowPipelineOptions dataflowOptions;
/**
* The state the job terminated in or {@code null} if the job has not terminated.
@@ -112,13 +113,17 @@ public class DataflowPipelineJob implements PipelineResult {
*
* @param projectId the project id
* @param jobId the job id
- * @param dataflowClient the client for the Dataflow Service
+ * @param dataflowOptions the client for the Dataflow Service
+ * @param aggregatorTransforms a mapping from aggregators to PTransforms
*/
- public DataflowPipelineJob(String projectId, String jobId, Dataflow dataflowClient,
+ public DataflowPipelineJob(
+ String projectId,
+ String jobId,
+ DataflowPipelineOptions dataflowOptions,
DataflowAggregatorTransforms aggregatorTransforms) {
this.projectId = projectId;
this.jobId = jobId;
- this.dataflowClient = dataflowClient;
+ this.dataflowOptions = dataflowOptions;
this.aggregatorTransforms = aggregatorTransforms;
}
@@ -152,12 +157,25 @@ public class DataflowPipelineJob implements PipelineResult {
return replacedByJob;
}
+ @Override
+ @Nullable
+ public State waitUntilFinish() throws IOException, InterruptedException {
+ return waitUntilFinish(Duration.millis(-1));
+ }
+
+ @Override
+ @Nullable
+ public State waitUntilFinish(Duration duration)
+ throws IOException, InterruptedException {
+ return waitUntilFinish(duration, new MonitoringUtil.LoggingHandler());
+ }
+
/**
- * Waits for the job to finish and return the final status.
+ * Waits until the pipeline finishes and returns the final status.
*
- * @param timeToWait The time to wait in units timeUnit for the job to finish.
+ * @param duration The time to wait for the job to finish.
* Provide a value less than 1 ms for an infinite wait.
- * @param timeUnit The unit of time for timeToWait.
+ *
* @param messageHandler If non null this handler will be invoked for each
* batch of messages received.
* @return The final state of the job or null on timeout or if the
@@ -167,48 +185,45 @@ public class DataflowPipelineJob implements PipelineResult {
* @throws InterruptedException
*/
@Nullable
- public State waitToFinish(
- long timeToWait,
- TimeUnit timeUnit,
- MonitoringUtil.JobMessagesHandler messageHandler)
- throws IOException, InterruptedException {
- return waitToFinish(timeToWait, timeUnit, messageHandler, Sleeper.DEFAULT, NanoClock.SYSTEM);
+ @VisibleForTesting
+ public State waitUntilFinish(
+ Duration duration,
+ MonitoringUtil.JobMessagesHandler messageHandler) throws IOException, InterruptedException {
+ return waitUntilFinish(duration, messageHandler, Sleeper.DEFAULT, NanoClock.SYSTEM);
}
/**
- * Wait for the job to finish and return the final status.
+ * Waits until the pipeline finishes and returns the final status.
*
- * @param timeToWait The time to wait in units timeUnit for the job to finish.
+ * @param duration The time to wait for the job to finish.
* Provide a value less than 1 ms for an infinite wait.
- * @param timeUnit The unit of time for timeToWait.
+ *
* @param messageHandler If non null this handler will be invoked for each
* batch of messages received.
* @param sleeper A sleeper to use to sleep between attempts.
* @param nanoClock A nanoClock used to time the total time taken.
- * @return The final state of the job or null on timeout or if the
- * thread is interrupted.
+ * @return The final state of the job or null on timeout.
* @throws IOException If there is a persistent problem getting job
* information.
- * @throws InterruptedException
+ * @throws InterruptedException if the thread is interrupted.
*/
@Nullable
@VisibleForTesting
- State waitToFinish(
- long timeToWait,
- TimeUnit timeUnit,
+ State waitUntilFinish(
+ Duration duration,
MonitoringUtil.JobMessagesHandler messageHandler,
Sleeper sleeper,
NanoClock nanoClock)
throws IOException, InterruptedException {
- MonitoringUtil monitor = new MonitoringUtil(projectId, dataflowClient);
+ MonitoringUtil monitor = new MonitoringUtil(projectId, dataflowOptions.getDataflowClient());
long lastTimestamp = 0;
BackOff backoff =
- timeUnit.toMillis(timeToWait) > 0
+ duration.getMillis() > 0
? new AttemptAndTimeBoundedExponentialBackOff(
MESSAGES_POLLING_ATTEMPTS,
MESSAGES_POLLING_INTERVAL,
- timeUnit.toMillis(timeToWait),
+ duration.getMillis(),
AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS,
nanoClock)
: new AttemptBoundedExponentialBackOff(
@@ -250,18 +265,16 @@ public class DataflowPipelineJob implements PipelineResult {
return null; // Timed out.
}
- /**
- * Cancels the job.
- * @throws IOException if there is a problem executing the cancel request.
- */
- public void cancel() throws IOException {
+ @Override
+ public State cancel() throws IOException {
Job content = new Job();
content.setProjectId(projectId);
content.setId(jobId);
content.setRequestedState("JOB_STATE_CANCELLED");
- dataflowClient.projects().jobs()
+ dataflowOptions.getDataflowClient().projects().jobs()
.update(projectId, jobId, content)
.execute();
+ return State.CANCELLED;
}
@Override
@@ -315,7 +328,7 @@ public class DataflowPipelineJob implements PipelineResult {
// Retry loop ends in return or throw
while (true) {
try {
- Job job = dataflowClient
+ Job job = dataflowOptions.getDataflowClient()
.projects()
.jobs()
.get(projectId, jobId)
@@ -324,7 +337,7 @@ public class DataflowPipelineJob implements PipelineResult {
if (currentState.isTerminal()) {
terminalState = currentState;
replacedByJob = new DataflowPipelineJob(
- getProjectId(), job.getReplacedByJobId(), dataflowClient, aggregatorTransforms);
+ getProjectId(), job.getReplacedByJobId(), dataflowOptions, aggregatorTransforms);
}
return job;
} catch (IOException exn) {
@@ -371,8 +384,8 @@ public class DataflowPipelineJob implements PipelineResult {
metricUpdates = terminalMetricUpdates;
} else {
boolean terminal = getState().isTerminal();
- JobMetrics jobMetrics =
- dataflowClient.projects().jobs().getMetrics(projectId, jobId).execute();
+ JobMetrics jobMetrics = dataflowOptions.getDataflowClient()
+ .projects().jobs().getMetrics(projectId, jobId).execute();
metricUpdates = jobMetrics.getMetrics();
if (terminal && jobMetrics.getMetrics() != null) {
terminalMetricUpdates = metricUpdates;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 8f9e76e..7191fe8 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -606,9 +606,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// Use a raw client for post-launch monitoring, as status calls may fail
// regularly and need not be retried automatically.
- DataflowPipelineJob dataflowPipelineJob =
- new DataflowPipelineJob(options.getProject(), jobResult.getId(),
- options.getDataflowClient(), aggregatorTransforms);
+ DataflowPipelineJob dataflowPipelineJob = new DataflowPipelineJob(
+ options.getProject(), jobResult.getId(), options, aggregatorTransforms);
// If the service returned client request id, the SDK needs to compare it
// with the original id generated in the request, if they are not the same
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
index b60e1be..f74f4dd 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
@@ -41,6 +41,8 @@ import com.google.api.services.dataflow.model.MetricUpdate;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
+
+import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +52,6 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
/**
* {@link TestDataflowRunner} is a pipeline runner that wraps a
@@ -131,7 +132,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
}
});
- State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, messageHandler);
+ State finalState = job.waitUntilFinish(Duration.standardMinutes(10L), messageHandler);
if (finalState == null || finalState == State.RUNNING) {
LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.",
job.getJobId());
@@ -139,7 +140,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
result = resultFuture.get();
} else {
- job.waitToFinish(-1, TimeUnit.SECONDS, messageHandler);
+ job.waitUntilFinish(Duration.standardSeconds(-1), messageHandler);
result = checkForSuccess(job);
}
if (!result.isPresent()) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java
index 7be074e..7bdac3d 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java
@@ -19,14 +19,13 @@ package org.apache.beam.runners.dataflow;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -40,14 +39,13 @@ import org.hamcrest.Description;
import org.hamcrest.Factory;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
+import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import java.util.concurrent.TimeUnit;
-
/**
* Tests for BlockingDataflowRunner.
*/
@@ -181,8 +179,7 @@ public class BlockingDataflowRunnerTest {
DataflowPipelineJob mockJob = mock(DataflowPipelineJob.class);
when(mockJob.getProjectId()).thenReturn(projectId);
when(mockJob.getJobId()).thenReturn(jobId);
- when(mockJob.waitToFinish(
- anyLong(), isA(TimeUnit.class), isA(MonitoringUtil.JobMessagesHandler.class)))
+ when(mockJob.waitUntilFinish(any(Duration.class)))
.thenReturn(terminalState);
return mockJob;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 80b7e7b..343d538 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -33,8 +33,10 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
+import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AggregatorRetrievalException;
import org.apache.beam.sdk.runners.AggregatorValues;
import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
@@ -59,6 +61,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSetMultimap;
+import org.joda.time.Duration;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -93,12 +96,17 @@ public class DataflowPipelineJobTest {
@Rule
public ExpectedException thrown = ExpectedException.none();
+ private TestDataflowPipelineOptions options;
+
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
when(mockWorkflowClient.projects()).thenReturn(mockProjects);
when(mockProjects.jobs()).thenReturn(mockJobs);
+
+ options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
+ options.setDataflowClient(mockWorkflowClient);
}
/**
@@ -146,9 +154,10 @@ public class DataflowPipelineJobTest {
mock(DataflowAggregatorTransforms.class);
DataflowPipelineJob job = new DataflowPipelineJob(
- PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms);
+ PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
- State state = job.waitToFinish(5, TimeUnit.MINUTES, jobHandler, fastClock, fastClock);
+ State state = job.waitUntilFinish(
+ Duration.standardMinutes(5), jobHandler, fastClock, fastClock);
assertEquals(null, state);
}
@@ -164,9 +173,9 @@ public class DataflowPipelineJobTest {
mock(DataflowAggregatorTransforms.class);
DataflowPipelineJob job = new DataflowPipelineJob(
- PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms);
+ PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
- return job.waitToFinish(1, TimeUnit.MINUTES, null, fastClock, fastClock);
+ return job.waitUntilFinish(Duration.standardMinutes(1), null, fastClock, fastClock);
}
/**
@@ -215,10 +224,10 @@ public class DataflowPipelineJobTest {
mock(DataflowAggregatorTransforms.class);
DataflowPipelineJob job = new DataflowPipelineJob(
- PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms);
+ PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
long startTime = fastClock.nanoTime();
- State state = job.waitToFinish(5, TimeUnit.MINUTES, null, fastClock, fastClock);
+ State state = job.waitUntilFinish(Duration.standardMinutes(5), null, fastClock, fastClock);
assertEquals(null, state);
long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);
checkValidInterval(DataflowPipelineJob.MESSAGES_POLLING_INTERVAL,
@@ -235,9 +244,9 @@ public class DataflowPipelineJobTest {
mock(DataflowAggregatorTransforms.class);
DataflowPipelineJob job = new DataflowPipelineJob(
- PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms);
+ PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
long startTime = fastClock.nanoTime();
- State state = job.waitToFinish(4, TimeUnit.MILLISECONDS, null, fastClock, fastClock);
+ State state = job.waitUntilFinish(Duration.millis(4), null, fastClock, fastClock);
assertEquals(null, state);
long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);
// Should only sleep for the 4 ms remaining.
@@ -258,7 +267,7 @@ public class DataflowPipelineJobTest {
mock(DataflowAggregatorTransforms.class);
DataflowPipelineJob job = new DataflowPipelineJob(
- PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms);
+ PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
assertEquals(
State.RUNNING,
@@ -275,7 +284,7 @@ public class DataflowPipelineJobTest {
mock(DataflowAggregatorTransforms.class);
DataflowPipelineJob job = new DataflowPipelineJob(
- PROJECT_ID, JOB_ID, mockWorkflowClient, dataflowAggregatorTransforms);
+ PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
long startTime = fastClock.nanoTime();
assertEquals(
@@ -314,7 +323,7 @@ public class DataflowPipelineJobTest {
modelJob.setCurrentState(State.RUNNING.toString());
DataflowPipelineJob job =
- new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms);
+ new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
AggregatorValues<?> values = job.getAggregatorValues(aggregator);
@@ -349,7 +358,7 @@ public class DataflowPipelineJobTest {
modelJob.setCurrentState(State.RUNNING.toString());
DataflowPipelineJob job =
- new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms);
+ new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
AggregatorValues<?> values = job.getAggregatorValues(aggregator);
@@ -395,7 +404,7 @@ public class DataflowPipelineJobTest {
modelJob.setCurrentState(State.RUNNING.toString());
DataflowPipelineJob job =
- new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms);
+ new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
@@ -463,7 +472,7 @@ public class DataflowPipelineJobTest {
modelJob.setCurrentState(State.RUNNING.toString());
DataflowPipelineJob job =
- new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms);
+ new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
@@ -512,7 +521,7 @@ public class DataflowPipelineJobTest {
modelJob.setCurrentState(State.RUNNING.toString());
DataflowPipelineJob job =
- new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms);
+ new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
@@ -530,7 +539,7 @@ public class DataflowPipelineJobTest {
ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
DataflowPipelineJob job =
- new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms);
+ new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("not used in this pipeline");
@@ -566,7 +575,7 @@ public class DataflowPipelineJobTest {
modelJob.setCurrentState(State.RUNNING.toString());
DataflowPipelineJob job =
- new DataflowPipelineJob(PROJECT_ID, JOB_ID, mockWorkflowClient, aggregatorTransforms);
+ new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
thrown.expect(AggregatorRetrievalException.class);
thrown.expectCause(is(cause));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
index 221cd0d..b4bbd39 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -67,6 +67,7 @@ import com.google.common.collect.Lists;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
+import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
@@ -83,7 +84,6 @@ import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
/** Tests for {@link TestDataflowRunner}. */
@RunWith(JUnit4.class)
@@ -175,7 +175,7 @@ public class TestDataflowRunnerTest {
when(mockJob.getState()).thenReturn(State.RUNNING);
when(mockJob.getProjectId()).thenReturn("test-project");
when(mockJob.getJobId()).thenReturn("test-job");
- when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+ when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
.thenAnswer(new Answer<State>() {
@Override
public State answer(InvocationOnMock invocation) {
@@ -183,7 +183,7 @@ public class TestDataflowRunnerTest {
message.setMessageText("FooException");
message.setTime(TimeUtil.toCloudTime(Instant.now()));
message.setMessageImportance("JOB_MESSAGE_ERROR");
- ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
+ ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[1])
.process(Arrays.asList(message));
return State.CANCELLED;
}
@@ -259,7 +259,7 @@ public class TestDataflowRunnerTest {
@Test
public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+ spy(new DataflowPipelineJob("test-project", "test-job", options, null));
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -274,7 +274,7 @@ public class TestDataflowRunnerTest {
@Test
public void testCheckingForSuccessWhenPAssertFails() throws Exception {
DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+ spy(new DataflowPipelineJob("test-project", "test-job", options, null));
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -289,7 +289,7 @@ public class TestDataflowRunnerTest {
@Test
public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+ spy(new DataflowPipelineJob("test-project", "test-job", options, null));
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -325,7 +325,7 @@ public class TestDataflowRunnerTest {
@Test
public void testStreamingPipelineFailsIfServiceFails() throws Exception {
DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+ spy(new DataflowPipelineJob("test-project", "test-job", options, null));
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -348,7 +348,7 @@ public class TestDataflowRunnerTest {
when(mockJob.getState()).thenReturn(State.RUNNING);
when(mockJob.getProjectId()).thenReturn("test-project");
when(mockJob.getJobId()).thenReturn("test-job");
- when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+ when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
.thenAnswer(new Answer<State>() {
@Override
public State answer(InvocationOnMock invocation) {
@@ -356,7 +356,7 @@ public class TestDataflowRunnerTest {
message.setMessageText("FooException");
message.setTime(TimeUtil.toCloudTime(Instant.now()));
message.setMessageImportance("JOB_MESSAGE_ERROR");
- ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
+ ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[1])
.process(Arrays.asList(message));
return State.CANCELLED;
}
@@ -422,7 +422,7 @@ public class TestDataflowRunnerTest {
p.getOptions().as(TestPipelineOptions.class)
.setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
- when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+ when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
.thenReturn(State.DONE);
when(request.execute()).thenReturn(
@@ -472,7 +472,7 @@ public class TestDataflowRunnerTest {
p.getOptions().as(TestPipelineOptions.class)
.setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
- when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+ when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
.thenReturn(State.DONE);
when(request.execute()).thenReturn(
@@ -503,8 +503,8 @@ public class TestDataflowRunnerTest {
try {
runner.run(p, mockRunner);
} catch (AssertionError expected) {
- verify(mockJob, Mockito.times(1)).waitToFinish(any(Long.class), any(TimeUnit.class),
- any(JobMessagesHandler.class));
+ verify(mockJob, Mockito.times(1)).waitUntilFinish(
+ any(Duration.class), any(JobMessagesHandler.class));
return;
}
fail("Expected an exception on pipeline failure.");
@@ -529,7 +529,7 @@ public class TestDataflowRunnerTest {
p.getOptions().as(TestPipelineOptions.class)
.setOnSuccessMatcher(new TestFailureMatcher());
- when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+ when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
.thenReturn(State.FAILED);
when(request.execute()).thenReturn(
@@ -537,7 +537,7 @@ public class TestDataflowRunnerTest {
try {
runner.run(p, mockRunner);
} catch (AssertionError expected) {
- verify(mockJob, Mockito.times(1)).waitToFinish(any(Long.class), any(TimeUnit.class),
+ verify(mockJob, Mockito.times(1)).waitUntilFinish(any(Duration.class),
any(JobMessagesHandler.class));
return;
}
@@ -560,8 +560,8 @@ public class TestDataflowRunnerTest {
fail(String.format("Expected PipelineResult but received %s", o));
}
try {
- verify(mockJob, Mockito.times(called)).waitToFinish(any(Long.class), any(TimeUnit.class),
- any(JobMessagesHandler.class));
+ verify(mockJob, Mockito.times(called)).waitUntilFinish(
+ any(Duration.class), any(JobMessagesHandler.class));
} catch (IOException | InterruptedException e) {
throw new AssertionError(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index d737f5e..169c2af 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -44,7 +44,9 @@ import com.google.common.collect.Iterables;
import org.apache.spark.api.java.JavaRDDLike;
import org.apache.spark.api.java.JavaSparkContext;
+import org.joda.time.Duration;
+import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
@@ -286,4 +288,23 @@ public class EvaluationContext implements EvaluationResult {
public State getState() {
return State.DONE;
}
+
+ @Override
+ public State cancel() throws IOException {
+ throw new UnsupportedOperationException(
+ "Spark runner EvaluationContext does not support cancel.");
+ }
+
+ @Override
+ public State waitUntilFinish()
+ throws IOException, InterruptedException {
+ return waitUntilFinish(Duration.millis(-1));
+ }
+
+ @Override
+ public State waitUntilFinish(Duration duration)
+ throws IOException, InterruptedException {
+ throw new UnsupportedOperationException(
+ "Spark runner EvaluationContext does not support waitUntilFinish.");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
index f67cb47..993962c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
@@ -21,6 +21,10 @@ import org.apache.beam.sdk.runners.AggregatorRetrievalException;
import org.apache.beam.sdk.runners.AggregatorValues;
import org.apache.beam.sdk.transforms.Aggregator;
+import org.joda.time.Duration;
+
+import java.io.IOException;
+
/**
* Result of {@link Pipeline#run()}.
*/
@@ -34,6 +38,40 @@ public interface PipelineResult {
State getState();
/**
+ * Cancels the pipeline execution.
+ *
+ * @throws IOException if there is a problem executing the cancel request.
+ * @throws UnsupportedOperationException if the runner does not support cancellation.
+ */
+ State cancel() throws IOException;
+
+ /**
+ * Waits until the pipeline finishes and returns the final status.
+ * It times out after the given duration.
+ *
+ * @param duration The time to wait for the pipeline to finish.
+ * Provide a value less than 1 ms for an infinite wait.
+ *
+ * @return The final state of the pipeline or null on timeout.
+ * @throws IOException If there is a persistent problem getting job
+ * information.
+ * @throws InterruptedException if the thread is interrupted.
+ * @throws UnsupportedOperationException if the runner does not support cancellation.
+ */
+ State waitUntilFinish(Duration duration) throws IOException, InterruptedException;
+
+ /**
+ * Waits until the pipeline finishes and returns the final status.
+ *
+ * @return The final state of the pipeline.
+ * @throws IOException If there is a persistent problem getting job
+ * information.
+ * @throws InterruptedException if the thread is interrupted.
+ * @throws UnsupportedOperationException if the runner does not support cancellation.
+ */
+ State waitUntilFinish() throws IOException, InterruptedException;
+
+ /**
* Retrieves the current value of the provided {@link Aggregator}.
*
* @param aggregator the {@link Aggregator} to retrieve values for.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b4b7c76/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
index 8eb95c0..fa29fdd 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
@@ -49,7 +49,6 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletResponse;
@@ -309,12 +308,12 @@ public class DataflowExampleUtils {
addShutdownHook(jobsToCancel);
}
try {
- job.waitToFinish(-1, TimeUnit.SECONDS, new MonitoringUtil.LoggingHandler());
+ job.waitUntilFinish();
} catch (Exception e) {
throw new RuntimeException("Failed to wait for job to finish: " + job.getJobId());
}
} else {
- // Do nothing if the given PipelineResult doesn't support waitToFinish(),
+ // Do nothing if the given PipelineResult doesn't support waitUntilFinish(),
// such as EvaluationResults returned by DirectRunner.
}
}