You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/03/23 20:48:25 UTC
[1/2] beam git commit: Makes DataflowPipelineJob.cancel() idempotent.
Repository: beam
Updated Branches:
refs/heads/master 47ba7b033 -> def96a2bb
Makes DataflowPipelineJob.cancel() idempotent.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/081664eb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/081664eb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/081664eb
Branch: refs/heads/master
Commit: 081664eb6490a28ec320ab52e2b7a81bac03391e
Parents: 47ba7b0
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Mar 22 16:39:43 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Mar 23 13:47:49 2017 -0700
----------------------------------------------------------------------
.../runners/dataflow/DataflowPipelineJob.java | 71 ++++++++++++++------
.../dataflow/DataflowPipelineJobTest.java | 2 +-
2 files changed, 52 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/081664eb/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 732e0af..5ad6f9f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -34,6 +34,10 @@ import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
@@ -338,29 +342,56 @@ public class DataflowPipelineJob implements PipelineResult {
return null; // Timed out.
}
+ private AtomicReference<FutureTask<State>> cancelState = new AtomicReference<>();
+
@Override
public State cancel() throws IOException {
- Job content = new Job();
- content.setProjectId(getProjectId());
- content.setId(jobId);
- content.setRequestedState("JOB_STATE_CANCELLED");
- try {
- dataflowClient.updateJob(jobId, content);
- return State.CANCELLED;
- } catch (IOException e) {
- State state = getState();
- if (state.isTerminal()) {
- LOG.warn("Job is already terminated. State is {}", state);
- return state;
- } else {
- String errorMsg = String.format(
- "Failed to cancel job in state %s, "
- + "please go to the Developers Console to cancel it manually: %s",
- state,
- MonitoringUtil.getJobMonitoringPageURL(getProjectId(), getJobId()));
- LOG.warn(errorMsg);
- throw new IOException(errorMsg, e);
+ // Enforce that a cancel() call on the job is done at most once - as
+ // a workaround for Dataflow service's current bugs with multiple
+ // cancellation, where it may sometimes return an error when cancelling
+ // a job that was already cancelled, but still report the job state as
+ // RUNNING.
+ // To partially work around these issues, we absorb duplicate cancel()
+ // calls. This, of course, doesn't address the case when the job terminates
+ // externally almost concurrently to calling cancel(), but at least it
+ // makes it possible to safely call cancel() multiple times and from
+ // multiple threads in one program.
+ FutureTask<State> tentativeCancelTask = new FutureTask<>(new Callable<State>() {
+ @Override
+ public State call() throws Exception {
+ Job content = new Job();
+ content.setProjectId(getProjectId());
+ content.setId(jobId);
+ content.setRequestedState("JOB_STATE_CANCELLED");
+ try {
+ Job job = dataflowClient.updateJob(jobId, content);
+ return MonitoringUtil.toState(job.getCurrentState());
+ } catch (IOException e) {
+ State state = getState();
+ if (state.isTerminal()) {
+ LOG.warn("Job is already terminated. State is {}", state);
+ return state;
+ } else {
+ String errorMsg = String.format(
+ "Failed to cancel job in state %s, "
+ + "please go to the Developers Console to cancel it manually: %s",
+ state,
+ MonitoringUtil.getJobMonitoringPageURL(getProjectId(), getJobId()));
+ LOG.warn(errorMsg);
+ throw new IOException(errorMsg, e);
+ }
+ }
}
+ });
+ if (cancelState.compareAndSet(null, tentativeCancelTask)) {
+ // This thread should perform cancellation, while others will
+ // only wait for the result.
+ cancelState.get().run();
+ }
+ try {
+ return cancelState.get().get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IOException(e);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/081664eb/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 5d4264d..108badd 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -723,7 +723,7 @@ public class DataflowPipelineJobTest {
mock(Dataflow.Projects.Locations.Jobs.Update.class);
when(mockJobs.update(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID), any(Job.class)))
.thenReturn(update);
- when(update.execute()).thenReturn(new Job());
+ when(update.execute()).thenReturn(new Job().setCurrentState("JOB_STATE_CANCELLED"));
DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
[2/2] beam git commit: This closes #2295
Posted by jk...@apache.org.
This closes #2295
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/def96a2b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/def96a2b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/def96a2b
Branch: refs/heads/master
Commit: def96a2bbe0b6299034c18cda4b41decfda6d4f7
Parents: 47ba7b0 081664e
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 23 13:48:08 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Mar 23 13:48:08 2017 -0700
----------------------------------------------------------------------
.../runners/dataflow/DataflowPipelineJob.java | 71 ++++++++++++++------
.../dataflow/DataflowPipelineJobTest.java | 2 +-
2 files changed, 52 insertions(+), 21 deletions(-)
----------------------------------------------------------------------