You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/12/07 01:15:45 UTC
[3/4] incubator-beam git commit: [Code Health] Remove redundant
projectId from DataflowPipelineJob.
[Code Health] Remove redundant projectId from DataflowPipelineJob.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ded58832
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ded58832
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ded58832
Branch: refs/heads/master
Commit: ded58832ceaef487f4590d9396f09744288c955d
Parents: afedd68
Author: Pei He <pe...@google.com>
Authored: Wed Nov 23 16:14:27 2016 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Dec 6 17:08:12 2016 -0800
----------------------------------------------------------------------
.../runners/dataflow/DataflowPipelineJob.java | 22 +++------
.../beam/runners/dataflow/DataflowRunner.java | 4 +-
.../dataflow/util/DataflowTemplateJob.java | 2 +-
.../dataflow/DataflowPipelineJobTest.java | 48 ++++++++++----------
.../testing/TestDataflowRunnerTest.java | 36 +++++----------
5 files changed, 45 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index a2b632f..58e85e0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -62,11 +62,6 @@ public class DataflowPipelineJob implements PipelineResult {
private String jobId;
/**
- * Google cloud project to associate this pipeline with.
- */
- private String projectId;
-
- /**
* Client for the Dataflow service. This can be used to query the service
* for information about the job.
*/
@@ -119,17 +114,14 @@ public class DataflowPipelineJob implements PipelineResult {
/**
* Constructs the job.
*
- * @param projectId the project id
* @param jobId the job id
* @param dataflowOptions used to configure the client for the Dataflow Service
* @param aggregatorTransforms a mapping from aggregators to PTransforms
*/
public DataflowPipelineJob(
- String projectId,
String jobId,
DataflowPipelineOptions dataflowOptions,
DataflowAggregatorTransforms aggregatorTransforms) {
- this.projectId = projectId;
this.jobId = jobId;
this.dataflowOptions = dataflowOptions;
this.aggregatorTransforms = aggregatorTransforms;
@@ -146,7 +138,7 @@ public class DataflowPipelineJob implements PipelineResult {
* Get the project this job exists in.
*/
public String getProjectId() {
- return projectId;
+ return dataflowOptions.getProject();
}
/**
@@ -249,7 +241,7 @@ public class DataflowPipelineJob implements PipelineResult {
MonitoringUtil.JobMessagesHandler messageHandler,
Sleeper sleeper,
NanoClock nanoClock) throws IOException, InterruptedException {
- MonitoringUtil monitor = new MonitoringUtil(projectId, dataflowOptions.getDataflowClient());
+ MonitoringUtil monitor = new MonitoringUtil(getProjectId(), dataflowOptions.getDataflowClient());
long lastTimestamp = 0;
BackOff backoff;
@@ -338,12 +330,12 @@ public class DataflowPipelineJob implements PipelineResult {
@Override
public State cancel() throws IOException {
Job content = new Job();
- content.setProjectId(projectId);
+ content.setProjectId(getProjectId());
content.setId(jobId);
content.setRequestedState("JOB_STATE_CANCELLED");
try {
dataflowOptions.getDataflowClient().projects().jobs()
- .update(projectId, jobId, content)
+ .update(getProjectId(), jobId, content)
.execute();
return State.CANCELLED;
} catch (IOException e) {
@@ -412,13 +404,13 @@ public class DataflowPipelineJob implements PipelineResult {
Job job = dataflowOptions.getDataflowClient()
.projects()
.jobs()
- .get(projectId, jobId)
+ .get(getProjectId(), jobId)
.execute();
State currentState = MonitoringUtil.toState(job.getCurrentState());
if (currentState.isTerminal()) {
terminalState = currentState;
replacedByJob = new DataflowPipelineJob(
- getProjectId(), job.getReplacedByJobId(), dataflowOptions, aggregatorTransforms);
+ job.getReplacedByJobId(), dataflowOptions, aggregatorTransforms);
}
return job;
} catch (IOException exn) {
@@ -485,7 +477,7 @@ public class DataflowPipelineJob implements PipelineResult {
} else {
boolean terminal = getState().isTerminal();
JobMetrics jobMetrics = dataflowOptions.getDataflowClient()
- .projects().jobs().getMetrics(projectId, jobId).execute();
+ .projects().jobs().getMetrics(getProjectId(), jobId).execute();
metricUpdates = jobMetrics.getMetrics();
if (terminal && jobMetrics.getMetrics() != null) {
terminalMetricUpdates = metricUpdates;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 339771b..e781b4e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -629,8 +629,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// Use a raw client for post-launch monitoring, as status calls may fail
// regularly and need not be retried automatically.
- DataflowPipelineJob dataflowPipelineJob = new DataflowPipelineJob(
- options.getProject(), jobResult.getId(), options, aggregatorTransforms);
+ DataflowPipelineJob dataflowPipelineJob =
+ new DataflowPipelineJob(jobResult.getId(), options, aggregatorTransforms);
// If the service returned client request id, the SDK needs to compare it
// with the original id generated in the request, if they are not the same
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
index 2937184..1a44963 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
@@ -30,7 +30,7 @@ public class DataflowTemplateJob extends DataflowPipelineJob {
"The result of template creation should not be used.";
public DataflowTemplateJob() {
- super(null, null, null, null);
+ super(null, null, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 0527b7c..323f762 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -115,6 +115,7 @@ public class DataflowPipelineJobTest {
options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
options.setDataflowClient(mockWorkflowClient);
+ options.setProject(PROJECT_ID);
}
/**
@@ -160,8 +161,8 @@ public class DataflowPipelineJobTest {
DataflowAggregatorTransforms dataflowAggregatorTransforms =
mock(DataflowAggregatorTransforms.class);
- DataflowPipelineJob job = new DataflowPipelineJob(
- PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+ DataflowPipelineJob job =
+ new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
State state = job.waitUntilFinish(
Duration.standardMinutes(5), jobHandler, fastClock, fastClock);
@@ -182,8 +183,8 @@ public class DataflowPipelineJobTest {
DataflowAggregatorTransforms dataflowAggregatorTransforms =
mock(DataflowAggregatorTransforms.class);
- DataflowPipelineJob job = new DataflowPipelineJob(
- PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+ DataflowPipelineJob job =
+ new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
return job.waitUntilFinish(Duration.standardMinutes(1), null, fastClock, fastClock);
}
@@ -249,8 +250,8 @@ public class DataflowPipelineJobTest {
DataflowAggregatorTransforms dataflowAggregatorTransforms =
mock(DataflowAggregatorTransforms.class);
- DataflowPipelineJob job = new DataflowPipelineJob(
- PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+ DataflowPipelineJob job =
+ new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
long startTime = fastClock.nanoTime();
State state = job.waitUntilFinish(Duration.standardMinutes(5), null, fastClock, fastClock);
@@ -269,8 +270,8 @@ public class DataflowPipelineJobTest {
DataflowAggregatorTransforms dataflowAggregatorTransforms =
mock(DataflowAggregatorTransforms.class);
- DataflowPipelineJob job = new DataflowPipelineJob(
- PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+ DataflowPipelineJob job =
+ new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
long startTime = fastClock.nanoTime();
State state = job.waitUntilFinish(Duration.millis(4), null, fastClock, fastClock);
assertEquals(null, state);
@@ -294,7 +295,7 @@ public class DataflowPipelineJobTest {
FastNanoClockAndFuzzySleeper clock = new FastNanoClockAndFuzzySleeper();
DataflowPipelineJob job = new DataflowPipelineJob(
- PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+ JOB_ID, options, dataflowAggregatorTransforms);
long startTime = clock.nanoTime();
State state = job.waitUntilFinish(Duration.millis(4), null, clock, clock);
assertEquals(null, state);
@@ -317,7 +318,7 @@ public class DataflowPipelineJobTest {
mock(DataflowAggregatorTransforms.class);
DataflowPipelineJob job = new DataflowPipelineJob(
- PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+ JOB_ID, options, dataflowAggregatorTransforms);
assertEquals(
State.RUNNING,
@@ -333,8 +334,8 @@ public class DataflowPipelineJobTest {
DataflowAggregatorTransforms dataflowAggregatorTransforms =
mock(DataflowAggregatorTransforms.class);
- DataflowPipelineJob job = new DataflowPipelineJob(
- PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+ DataflowPipelineJob job =
+ new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
long startTime = fastClock.nanoTime();
assertEquals(
@@ -373,7 +374,7 @@ public class DataflowPipelineJobTest {
modelJob.setCurrentState(State.RUNNING.toString());
DataflowPipelineJob job =
- new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+ new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
AggregatorValues<?> values = job.getAggregatorValues(aggregator);
@@ -408,7 +409,7 @@ public class DataflowPipelineJobTest {
modelJob.setCurrentState(State.RUNNING.toString());
DataflowPipelineJob job =
- new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+ new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
AggregatorValues<?> values = job.getAggregatorValues(aggregator);
@@ -453,8 +454,7 @@ public class DataflowPipelineJobTest {
when(getState.execute()).thenReturn(modelJob);
modelJob.setCurrentState(State.RUNNING.toString());
- DataflowPipelineJob job =
- new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+ DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
@@ -521,8 +521,7 @@ public class DataflowPipelineJobTest {
when(getState.execute()).thenReturn(modelJob);
modelJob.setCurrentState(State.RUNNING.toString());
- DataflowPipelineJob job =
- new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+ DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
@@ -571,7 +570,7 @@ public class DataflowPipelineJobTest {
modelJob.setCurrentState(State.RUNNING.toString());
DataflowPipelineJob job =
- new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+ new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
@@ -589,7 +588,7 @@ public class DataflowPipelineJobTest {
ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
DataflowPipelineJob job =
- new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+ new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("not used in this pipeline");
@@ -624,8 +623,7 @@ public class DataflowPipelineJobTest {
when(getState.execute()).thenReturn(modelJob);
modelJob.setCurrentState(State.RUNNING.toString());
- DataflowPipelineJob job =
- new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+ DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
thrown.expect(AggregatorRetrievalException.class);
thrown.expectCause(is(cause));
@@ -690,7 +688,7 @@ public class DataflowPipelineJobTest {
when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update);
when(update.execute()).thenReturn(new Job());
- DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, null);
+ DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
assertEquals(State.CANCELLED, job.cancel());
Job content = new Job();
@@ -714,7 +712,7 @@ public class DataflowPipelineJobTest {
when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update);
when(update.execute()).thenThrow(new IOException());
- DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, null);
+ DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
thrown.expect(IOException.class);
thrown.expectMessage("Failed to cancel the job, "
@@ -742,7 +740,7 @@ public class DataflowPipelineJobTest {
when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update);
when(update.execute()).thenThrow(new IOException());
- DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, null);
+ DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
assertEquals(State.FAILED, job.cancel());
Job content = new Job();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
index e6b513a..366c6a1 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -344,8 +344,7 @@ public class TestDataflowRunnerTest {
@Test
public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -359,8 +358,7 @@ public class TestDataflowRunnerTest {
@Test
public void testCheckingForSuccessWhenPAssertFails() throws Exception {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -374,8 +372,7 @@ public class TestDataflowRunnerTest {
@Test
public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -389,8 +386,7 @@ public class TestDataflowRunnerTest {
@Test
public void testCheckMaxWatermarkWithNoWatermarkMetric() throws IOException {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
@@ -403,8 +399,7 @@ public class TestDataflowRunnerTest {
@Test
public void testCheckMaxWatermarkWithSingleWatermarkAtMax() throws IOException {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
@@ -417,8 +412,7 @@ public class TestDataflowRunnerTest {
@Test
public void testCheckMaxWatermarkWithSingleWatermarkNotAtMax() throws IOException {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
@@ -431,8 +425,7 @@ public class TestDataflowRunnerTest {
@Test
public void testCheckMaxWatermarkWithMultipleWatermarksAtMax() throws IOException {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
@@ -446,8 +439,7 @@ public class TestDataflowRunnerTest {
@Test
public void testCheckMaxWatermarkWithMultipleMaxAndNotMaxWatermarks() throws IOException {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
@@ -461,8 +453,7 @@ public class TestDataflowRunnerTest {
@Test
public void testCheckMaxWatermarkIgnoresUnrelatedMatrics() throws IOException {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
@@ -476,8 +467,7 @@ public class TestDataflowRunnerTest {
@Test
public void testStreamingPipelineFailsIfServiceFails() throws Exception {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -532,8 +522,7 @@ public class TestDataflowRunnerTest {
@Test
public void testGetJobMetricsThatSucceeds() throws Exception {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
@@ -549,8 +538,7 @@ public class TestDataflowRunnerTest {
@Test
public void testGetJobMetricsThatFailsForException() throws Exception {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));