You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/06 01:20:48 UTC

[1/2] beam git commit: DataflowPipelineJob: handle concurrent cancel and finish

Repository: beam
Updated Branches:
  refs/heads/master 512ad1355 -> 20788953f


DataflowPipelineJob: handle concurrent cancel and finish

This makes job.cancel() not throw an exception if cancel() is called while job is finished.
(Note that state.isTerminal() is not guaranteed to be up to date.)


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/09580a73
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/09580a73
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/09580a73

Branch: refs/heads/master
Commit: 09580a731e7b67564315a56e43658300011512eb
Parents: 512ad13
Author: Dan Halperin <dh...@google.com>
Authored: Tue Apr 4 15:33:21 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Apr 5 18:20:29 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/DataflowPipelineJob.java   | 14 +++++++-
 .../dataflow/DataflowPipelineJobTest.java       | 37 +++++++++++++++-----
 2 files changed, 41 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/09580a73/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 5ad6f9f..7cb0f0e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -369,7 +369,19 @@ public class DataflowPipelineJob implements PipelineResult {
         } catch (IOException e) {
           State state = getState();
           if (state.isTerminal()) {
-            LOG.warn("Job is already terminated. State is {}", state);
+            LOG.warn("Cancel failed because job is already terminated. State is {}", state);
+            return state;
+          } else if (e.getMessage().contains("has terminated")) {
+            // This handles the case where the getState() call above returns RUNNING but the cancel
+            // was rejected because the job is in fact done. Hopefully, someday we can delete this
+            // code if there is better consistency between the State and whether Cancel succeeds.
+            //
+            // Example message:
+            //    Workflow modification failed. Causes: (7603adc9e9bff51e): Cannot perform
+            //    operation 'cancel' on Job: 2017-04-01_22_50_59-9269855660514862348. Job has
+            //    terminated in state SUCCESS: Workflow job: 2017-04-01_22_50_59-9269855660514862348
+            //    succeeded.
+            LOG.warn("Cancel failed because job is already terminated.", e);
             return state;
           } else {
             String errorMsg = String.format(

http://git-wip-us.apache.org/repos/asf/beam/blob/09580a73/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 108badd..e3d2e4e 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -618,7 +618,6 @@ public class DataflowPipelineJobTest {
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("not used in this pipeline");
-
     job.getAggregatorValues(aggregator);
   }
 
@@ -656,7 +655,6 @@ public class DataflowPipelineJobTest {
     thrown.expectCause(is(cause));
     thrown.expectMessage(aggregator.toString());
     thrown.expectMessage("when retrieving Aggregator values for");
-
     job.getAggregatorValues(aggregator);
   }
 
@@ -750,7 +748,7 @@ public class DataflowPipelineJobTest {
         Dataflow.Projects.Locations.Jobs.Update.class);
     when(mockJobs.update(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID), any(Job.class)))
         .thenReturn(update);
-    when(update.execute()).thenThrow(new IOException());
+    when(update.execute()).thenThrow(new IOException("Some random IOException"));
 
     DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
 
@@ -758,13 +756,34 @@ public class DataflowPipelineJobTest {
     thrown.expectMessage("Failed to cancel job in state RUNNING, "
         + "please go to the Developers Console to cancel it manually:");
     job.cancel();
+  }
 
-    Job content = new Job();
-    content.setProjectId(PROJECT_ID);
-    content.setId(JOB_ID);
-    content.setRequestedState("JOB_STATE_CANCELLED");
-    verify(mockJobs).update(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID), eq(content));
-    verify(mockJobs).get(PROJECT_ID, REGION_ID, JOB_ID);
+  /**
+   * Test that {@link DataflowPipelineJob#cancel} doesn't throw if the Dataflow service returns
+   * non-terminal state even though the cancel API call failed, which can happen in practice.
+   *
+   * <p>TODO: delete this code if the API calls become consistent.
+   */
+  @Test
+  public void testCancelTerminatedJobWithStaleState() throws IOException {
+    Dataflow.Projects.Locations.Jobs.Get statusRequest =
+        mock(Dataflow.Projects.Locations.Jobs.Get.class);
+
+    Job statusResponse = new Job();
+    statusResponse.setCurrentState("JOB_STATE_RUNNING");
+    when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(statusRequest);
+    when(statusRequest.execute()).thenReturn(statusResponse);
+
+    Dataflow.Projects.Locations.Jobs.Update update = mock(
+        Dataflow.Projects.Locations.Jobs.Update.class);
+    when(mockJobs.update(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID), any(Job.class)))
+        .thenReturn(update);
+    when(update.execute()).thenThrow(new IOException("Job has terminated in state SUCCESS"));
+
+    DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
+    State returned = job.cancel();
+    assertThat(returned, equalTo(State.RUNNING));
+    expectedLogs.verifyWarn("Cancel failed because job is already terminated.");
   }
 
   @Test


[2/2] beam git commit: This closes #2428

Posted by dh...@apache.org.
This closes #2428


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/20788953
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/20788953
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/20788953

Branch: refs/heads/master
Commit: 20788953f9cc51dc20aec44423dbdbdd53cc8741
Parents: 512ad13 09580a7
Author: Dan Halperin <dh...@google.com>
Authored: Wed Apr 5 18:20:36 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Apr 5 18:20:36 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/DataflowPipelineJob.java   | 14 +++++++-
 .../dataflow/DataflowPipelineJobTest.java       | 37 +++++++++++++++-----
 2 files changed, 41 insertions(+), 10 deletions(-)
----------------------------------------------------------------------