You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/03/23 20:48:25 UTC

[1/2] beam git commit: Makes DataflowPipelineJob.cancel() idempotent.

Repository: beam
Updated Branches:
  refs/heads/master 47ba7b033 -> def96a2bb


Makes DataflowPipelineJob.cancel() idempotent.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/081664eb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/081664eb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/081664eb

Branch: refs/heads/master
Commit: 081664eb6490a28ec320ab52e2b7a81bac03391e
Parents: 47ba7b0
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Mar 22 16:39:43 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Mar 23 13:47:49 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/DataflowPipelineJob.java   | 71 ++++++++++++++------
 .../dataflow/DataflowPipelineJobTest.java       |  2 +-
 2 files changed, 52 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/081664eb/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 732e0af..5ad6f9f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -34,6 +34,10 @@ import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
@@ -338,29 +342,56 @@ public class DataflowPipelineJob implements PipelineResult {
     return null;  // Timed out.
   }
 
+  private AtomicReference<FutureTask<State>> cancelState = new AtomicReference<>();
+
   @Override
   public State cancel() throws IOException {
-    Job content = new Job();
-    content.setProjectId(getProjectId());
-    content.setId(jobId);
-    content.setRequestedState("JOB_STATE_CANCELLED");
-    try {
-      dataflowClient.updateJob(jobId, content);
-      return State.CANCELLED;
-    } catch (IOException e) {
-      State state = getState();
-      if (state.isTerminal()) {
-        LOG.warn("Job is already terminated. State is {}", state);
-        return state;
-      } else {
-        String errorMsg = String.format(
-            "Failed to cancel job in state %s, "
-                + "please go to the Developers Console to cancel it manually: %s",
-            state,
-            MonitoringUtil.getJobMonitoringPageURL(getProjectId(), getJobId()));
-        LOG.warn(errorMsg);
-        throw new IOException(errorMsg, e);
+    // Enforce that a cancel() call on the job is done at most once - as
+    // a workaround for Dataflow service's current bugs with multiple
+    // cancellation, where it may sometimes return an error when cancelling
+    // a job that was already cancelled, but still report the job state as
+    // RUNNING.
+    // To partially work around these issues, we absorb duplicate cancel()
+    // calls. This, of course, doesn't address the case when the job terminates
+    // externally almost concurrently to calling cancel(), but at least it
+    // makes it possible to safely call cancel() multiple times and from
+    // multiple threads in one program.
+    FutureTask<State> tentativeCancelTask = new FutureTask<>(new Callable<State>() {
+      @Override
+      public State call() throws Exception {
+        Job content = new Job();
+        content.setProjectId(getProjectId());
+        content.setId(jobId);
+        content.setRequestedState("JOB_STATE_CANCELLED");
+        try {
+          Job job = dataflowClient.updateJob(jobId, content);
+          return MonitoringUtil.toState(job.getCurrentState());
+        } catch (IOException e) {
+          State state = getState();
+          if (state.isTerminal()) {
+            LOG.warn("Job is already terminated. State is {}", state);
+            return state;
+          } else {
+            String errorMsg = String.format(
+                "Failed to cancel job in state %s, "
+                    + "please go to the Developers Console to cancel it manually: %s",
+                state,
+                MonitoringUtil.getJobMonitoringPageURL(getProjectId(), getJobId()));
+            LOG.warn(errorMsg);
+            throw new IOException(errorMsg, e);
+          }
+        }
       }
+    });
+    if (cancelState.compareAndSet(null, tentativeCancelTask)) {
+      // This thread should perform cancellation, while others will
+      // only wait for the result.
+      cancelState.get().run();
+    }
+    try {
+      return cancelState.get().get();
+    } catch (InterruptedException | ExecutionException e) {
+      throw new IOException(e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/081664eb/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 5d4264d..108badd 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -723,7 +723,7 @@ public class DataflowPipelineJobTest {
         mock(Dataflow.Projects.Locations.Jobs.Update.class);
     when(mockJobs.update(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID), any(Job.class)))
         .thenReturn(update);
-    when(update.execute()).thenReturn(new Job());
+    when(update.execute()).thenReturn(new Job().setCurrentState("JOB_STATE_CANCELLED"));
 
     DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
 


[2/2] beam git commit: This closes #2295

Posted by jk...@apache.org.
This closes #2295


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/def96a2b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/def96a2b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/def96a2b

Branch: refs/heads/master
Commit: def96a2bbe0b6299034c18cda4b41decfda6d4f7
Parents: 47ba7b0 081664e
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 23 13:48:08 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Mar 23 13:48:08 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/DataflowPipelineJob.java   | 71 ++++++++++++++------
 .../dataflow/DataflowPipelineJobTest.java       |  2 +-
 2 files changed, 52 insertions(+), 21 deletions(-)
----------------------------------------------------------------------