You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/09/20 05:21:21 UTC
[1/2] incubator-beam git commit: Check Dataflow Job Status Before
Terminate
Repository: incubator-beam
Updated Branches:
refs/heads/master 9c8e19c1c -> 9e7ed2929
Check Dataflow Job Status Before Terminate
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e776ae73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e776ae73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e776ae73
Branch: refs/heads/master
Commit: e776ae7334aed7e42d152dbbcfa7b5a1fb998e27
Parents: 9c8e19c
Author: Mark Liu <ma...@markliu-macbookpro.roam.corp.google.com>
Authored: Mon Sep 19 15:24:17 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Sep 19 21:29:21 2016 -0700
----------------------------------------------------------------------
.../runners/dataflow/DataflowPipelineJob.java | 19 +++--
.../dataflow/DataflowPipelineJobTest.java | 74 ++++++++++++++++++++
2 files changed, 87 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e776ae73/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 1af8c98..269b824 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -301,14 +301,21 @@ public class DataflowPipelineJob implements PipelineResult {
dataflowOptions.getDataflowClient().projects().jobs()
.update(projectId, jobId, content)
.execute();
+ return State.CANCELLED;
} catch (IOException e) {
- String errorMsg = String.format(
- "Failed to cancel the job, please go to the Developers Console to cancel it manually: %s",
- MonitoringUtil.getJobMonitoringPageURL(getProjectId(), getJobId()));
- LOG.warn(errorMsg);
- throw new IOException(errorMsg, e);
+ State state = getState();
+ if (state.isTerminal()) {
+ LOG.warn("Job is already terminated. State is {}", state);
+ return state;
+ } else {
+ String errorMsg = String.format(
+ "Failed to cancel the job, "
+ + "please go to the Developers Console to cancel it manually: %s",
+ MonitoringUtil.getJobMonitoringPageURL(getProjectId(), getJobId()));
+ LOG.warn(errorMsg);
+ throw new IOException(errorMsg, e);
+ }
}
- return State.CANCELLED;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e776ae73/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 4c70d12..2af95e2 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -29,7 +29,11 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.api.client.util.NanoClock;
@@ -655,4 +659,74 @@ public class DataflowPipelineJobTest {
fastNanoTime += millis * 1000000L + ThreadLocalRandom.current().nextInt(500000);
}
}
+
+ @Test
+ public void testCancelUnterminatedJobThatSucceeds() throws IOException {
+ Dataflow.Projects.Jobs.Update update = mock(Dataflow.Projects.Jobs.Update.class);
+ when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update);
+ when(update.execute()).thenReturn(new Job());
+
+ DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, null);
+
+ assertEquals(State.CANCELLED, job.cancel());
+ Job content = new Job();
+ content.setProjectId(PROJECT_ID);
+ content.setId(JOB_ID);
+ content.setRequestedState("JOB_STATE_CANCELLED");
+ verify(mockJobs).update(eq(PROJECT_ID), eq(JOB_ID), eq(content));
+ verifyNoMoreInteractions(mockJobs);
+ }
+
+ @Test
+ public void testCancelUnterminatedJobThatFails() throws IOException {
+ Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class);
+
+ Job statusResponse = new Job();
+ statusResponse.setCurrentState("JOB_STATE_RUNNING");
+ when(mockJobs.get(anyString(), anyString())).thenReturn(statusRequest);
+ when(statusRequest.execute()).thenReturn(statusResponse);
+
+ Dataflow.Projects.Jobs.Update update = mock(Dataflow.Projects.Jobs.Update.class);
+ when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update);
+ when(update.execute()).thenThrow(new IOException());
+
+ DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, null);
+
+ thrown.expect(IOException.class);
+ thrown.expectMessage("Failed to cancel the job, "
+ + "please go to the Developers Console to cancel it manually:");
+ job.cancel();
+
+ Job content = new Job();
+ content.setProjectId(PROJECT_ID);
+ content.setId(JOB_ID);
+ content.setRequestedState("JOB_STATE_CANCELLED");
+ verify(mockJobs).update(eq(PROJECT_ID), eq(JOB_ID), eq(content));
+ verify(mockJobs).get(eq(PROJECT_ID), eq(JOB_ID));
+ }
+
+ @Test
+ public void testCancelTerminatedJob() throws IOException {
+ Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class);
+
+ Job statusResponse = new Job();
+ statusResponse.setCurrentState("JOB_STATE_FAILED");
+ when(mockJobs.get(anyString(), anyString())).thenReturn(statusRequest);
+ when(statusRequest.execute()).thenReturn(statusResponse);
+
+ Dataflow.Projects.Jobs.Update update = mock(Dataflow.Projects.Jobs.Update.class);
+ when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update);
+ when(update.execute()).thenThrow(new IOException());
+
+ DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, null);
+
+ assertEquals(State.FAILED, job.cancel());
+ Job content = new Job();
+ content.setProjectId(PROJECT_ID);
+ content.setId(JOB_ID);
+ content.setRequestedState("JOB_STATE_CANCELLED");
+ verify(mockJobs).update(eq(PROJECT_ID), eq(JOB_ID), eq(content));
+ verify(mockJobs).get(eq(PROJECT_ID), eq(JOB_ID));
+ verifyNoMoreInteractions(mockJobs);
+ }
}
[2/2] incubator-beam git commit: Check Dataflow Job Status Before
Terminate
Posted by lc...@apache.org.
Check Dataflow Job Status Before Terminate
This closes #951
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9e7ed292
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9e7ed292
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9e7ed292
Branch: refs/heads/master
Commit: 9e7ed29290911bb341261240d8d799bd7f0a4e9b
Parents: 9c8e19c e776ae7
Author: Luke Cwik <lc...@google.com>
Authored: Mon Sep 19 21:30:04 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Sep 19 21:30:04 2016 -0700
----------------------------------------------------------------------
.../runners/dataflow/DataflowPipelineJob.java | 19 +++--
.../dataflow/DataflowPipelineJobTest.java | 74 ++++++++++++++++++++
2 files changed, 87 insertions(+), 6 deletions(-)
----------------------------------------------------------------------