You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/09/20 05:21:21 UTC

[1/2] incubator-beam git commit: Check Dataflow Job Status Before Terminate

Repository: incubator-beam
Updated Branches:
  refs/heads/master 9c8e19c1c -> 9e7ed2929


Check Dataflow Job Status Before Terminate


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e776ae73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e776ae73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e776ae73

Branch: refs/heads/master
Commit: e776ae7334aed7e42d152dbbcfa7b5a1fb998e27
Parents: 9c8e19c
Author: Mark Liu <ma...@markliu-macbookpro.roam.corp.google.com>
Authored: Mon Sep 19 15:24:17 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Sep 19 21:29:21 2016 -0700

----------------------------------------------------------------------
 .../runners/dataflow/DataflowPipelineJob.java   | 19 +++--
 .../dataflow/DataflowPipelineJobTest.java       | 74 ++++++++++++++++++++
 2 files changed, 87 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e776ae73/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 1af8c98..269b824 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -301,14 +301,21 @@ public class DataflowPipelineJob implements PipelineResult {
       dataflowOptions.getDataflowClient().projects().jobs()
           .update(projectId, jobId, content)
           .execute();
+      return State.CANCELLED;
     } catch (IOException e) {
-      String errorMsg = String.format(
-          "Failed to cancel the job, please go to the Developers Console to cancel it manually: %s",
-          MonitoringUtil.getJobMonitoringPageURL(getProjectId(), getJobId()));
-      LOG.warn(errorMsg);
-      throw new IOException(errorMsg, e);
+      State state = getState();
+      if (state.isTerminal()) {
+        LOG.warn("Job is already terminated. State is {}", state);
+        return state;
+      } else {
+        String errorMsg = String.format(
+            "Failed to cancel the job, "
+                + "please go to the Developers Console to cancel it manually: %s",
+            MonitoringUtil.getJobMonitoringPageURL(getProjectId(), getJobId()));
+        LOG.warn(errorMsg);
+        throw new IOException(errorMsg, e);
+      }
     }
-    return State.CANCELLED;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e776ae73/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 4c70d12..2af95e2 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -29,7 +29,11 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import com.google.api.client.util.NanoClock;
@@ -655,4 +659,74 @@ public class DataflowPipelineJobTest {
       fastNanoTime += millis * 1000000L + ThreadLocalRandom.current().nextInt(500000);
     }
   }
+
+  @Test
+  public void testCancelUnterminatedJobThatSucceeds() throws IOException {
+    Dataflow.Projects.Jobs.Update update = mock(Dataflow.Projects.Jobs.Update.class);
+    when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update);
+    when(update.execute()).thenReturn(new Job());
+
+    DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, null);
+
+    assertEquals(State.CANCELLED, job.cancel());
+    Job content = new Job();
+    content.setProjectId(PROJECT_ID);
+    content.setId(JOB_ID);
+    content.setRequestedState("JOB_STATE_CANCELLED");
+    verify(mockJobs).update(eq(PROJECT_ID), eq(JOB_ID), eq(content));
+    verifyNoMoreInteractions(mockJobs);
+  }
+
+  @Test
+  public void testCancelUnterminatedJobThatFails() throws IOException {
+    Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class);
+
+    Job statusResponse = new Job();
+    statusResponse.setCurrentState("JOB_STATE_RUNNING");
+    when(mockJobs.get(anyString(), anyString())).thenReturn(statusRequest);
+    when(statusRequest.execute()).thenReturn(statusResponse);
+
+    Dataflow.Projects.Jobs.Update update = mock(Dataflow.Projects.Jobs.Update.class);
+    when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update);
+    when(update.execute()).thenThrow(new IOException());
+
+    DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, null);
+
+    thrown.expect(IOException.class);
+    thrown.expectMessage("Failed to cancel the job, "
+        + "please go to the Developers Console to cancel it manually:");
+    job.cancel();
+
+    Job content = new Job();
+    content.setProjectId(PROJECT_ID);
+    content.setId(JOB_ID);
+    content.setRequestedState("JOB_STATE_CANCELLED");
+    verify(mockJobs).update(eq(PROJECT_ID), eq(JOB_ID), eq(content));
+    verify(mockJobs).get(eq(PROJECT_ID), eq(JOB_ID));
+  }
+
+  @Test
+  public void testCancelTerminatedJob() throws IOException {
+    Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class);
+
+    Job statusResponse = new Job();
+    statusResponse.setCurrentState("JOB_STATE_FAILED");
+    when(mockJobs.get(anyString(), anyString())).thenReturn(statusRequest);
+    when(statusRequest.execute()).thenReturn(statusResponse);
+
+    Dataflow.Projects.Jobs.Update update = mock(Dataflow.Projects.Jobs.Update.class);
+    when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update);
+    when(update.execute()).thenThrow(new IOException());
+
+    DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, null);
+
+    assertEquals(State.FAILED, job.cancel());
+    Job content = new Job();
+    content.setProjectId(PROJECT_ID);
+    content.setId(JOB_ID);
+    content.setRequestedState("JOB_STATE_CANCELLED");
+    verify(mockJobs).update(eq(PROJECT_ID), eq(JOB_ID), eq(content));
+    verify(mockJobs).get(eq(PROJECT_ID), eq(JOB_ID));
+    verifyNoMoreInteractions(mockJobs);
+  }
 }


[2/2] incubator-beam git commit: Check Dataflow Job Status Before Terminate

Posted by lc...@apache.org.
Check Dataflow Job Status Before Terminate

This closes #951


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9e7ed292
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9e7ed292
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9e7ed292

Branch: refs/heads/master
Commit: 9e7ed29290911bb341261240d8d799bd7f0a4e9b
Parents: 9c8e19c e776ae7
Author: Luke Cwik <lc...@google.com>
Authored: Mon Sep 19 21:30:04 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Sep 19 21:30:04 2016 -0700

----------------------------------------------------------------------
 .../runners/dataflow/DataflowPipelineJob.java   | 19 +++--
 .../dataflow/DataflowPipelineJobTest.java       | 74 ++++++++++++++++++++
 2 files changed, 87 insertions(+), 6 deletions(-)
----------------------------------------------------------------------