You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/12/07 01:15:45 UTC

[3/4] incubator-beam git commit: [Code Health] Remove redundant projectId from DataflowPipelineJob.

[Code Health] Remove redundant projectId from DataflowPipelineJob.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ded58832
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ded58832
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ded58832

Branch: refs/heads/master
Commit: ded58832ceaef487f4590d9396f09744288c955d
Parents: afedd68
Author: Pei He <pe...@google.com>
Authored: Wed Nov 23 16:14:27 2016 -0800
Committer: bchambers <bc...@google.com>
Committed: Tue Dec 6 17:08:12 2016 -0800

----------------------------------------------------------------------
 .../runners/dataflow/DataflowPipelineJob.java   | 22 +++------
 .../beam/runners/dataflow/DataflowRunner.java   |  4 +-
 .../dataflow/util/DataflowTemplateJob.java      |  2 +-
 .../dataflow/DataflowPipelineJobTest.java       | 48 ++++++++++----------
 .../testing/TestDataflowRunnerTest.java         | 36 +++++----------
 5 files changed, 45 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index a2b632f..58e85e0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -62,11 +62,6 @@ public class DataflowPipelineJob implements PipelineResult {
   private String jobId;
 
   /**
-   * Google cloud project to associate this pipeline with.
-   */
-  private String projectId;
-
-  /**
    * Client for the Dataflow service. This can be used to query the service
    * for information about the job.
    */
@@ -119,17 +114,14 @@ public class DataflowPipelineJob implements PipelineResult {
   /**
    * Constructs the job.
    *
-   * @param projectId the project id
    * @param jobId the job id
    * @param dataflowOptions used to configure the client for the Dataflow Service
    * @param aggregatorTransforms a mapping from aggregators to PTransforms
    */
   public DataflowPipelineJob(
-      String projectId,
       String jobId,
       DataflowPipelineOptions dataflowOptions,
       DataflowAggregatorTransforms aggregatorTransforms) {
-    this.projectId = projectId;
     this.jobId = jobId;
     this.dataflowOptions = dataflowOptions;
     this.aggregatorTransforms = aggregatorTransforms;
@@ -146,7 +138,7 @@ public class DataflowPipelineJob implements PipelineResult {
    * Get the project this job exists in.
    */
   public String getProjectId() {
-    return projectId;
+    return dataflowOptions.getProject();
   }
 
   /**
@@ -249,7 +241,7 @@ public class DataflowPipelineJob implements PipelineResult {
       MonitoringUtil.JobMessagesHandler messageHandler,
       Sleeper sleeper,
       NanoClock nanoClock) throws IOException, InterruptedException {
-    MonitoringUtil monitor = new MonitoringUtil(projectId, dataflowOptions.getDataflowClient());
+    MonitoringUtil monitor = new MonitoringUtil(getProjectId(), dataflowOptions.getDataflowClient());
 
     long lastTimestamp = 0;
     BackOff backoff;
@@ -338,12 +330,12 @@ public class DataflowPipelineJob implements PipelineResult {
   @Override
   public State cancel() throws IOException {
     Job content = new Job();
-    content.setProjectId(projectId);
+    content.setProjectId(getProjectId());
     content.setId(jobId);
     content.setRequestedState("JOB_STATE_CANCELLED");
     try {
       dataflowOptions.getDataflowClient().projects().jobs()
-          .update(projectId, jobId, content)
+          .update(getProjectId(), jobId, content)
           .execute();
       return State.CANCELLED;
     } catch (IOException e) {
@@ -412,13 +404,13 @@ public class DataflowPipelineJob implements PipelineResult {
         Job job = dataflowOptions.getDataflowClient()
             .projects()
             .jobs()
-            .get(projectId, jobId)
+            .get(getProjectId(), jobId)
             .execute();
         State currentState = MonitoringUtil.toState(job.getCurrentState());
         if (currentState.isTerminal()) {
           terminalState = currentState;
           replacedByJob = new DataflowPipelineJob(
-              getProjectId(), job.getReplacedByJobId(), dataflowOptions, aggregatorTransforms);
+              job.getReplacedByJobId(), dataflowOptions, aggregatorTransforms);
         }
         return job;
       } catch (IOException exn) {
@@ -485,7 +477,7 @@ public class DataflowPipelineJob implements PipelineResult {
       } else {
         boolean terminal = getState().isTerminal();
         JobMetrics jobMetrics = dataflowOptions.getDataflowClient()
-            .projects().jobs().getMetrics(projectId, jobId).execute();
+            .projects().jobs().getMetrics(getProjectId(), jobId).execute();
         metricUpdates = jobMetrics.getMetrics();
         if (terminal && jobMetrics.getMetrics() != null) {
           terminalMetricUpdates = metricUpdates;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 339771b..e781b4e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -629,8 +629,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
     // Use a raw client for post-launch monitoring, as status calls may fail
     // regularly and need not be retried automatically.
-    DataflowPipelineJob dataflowPipelineJob = new DataflowPipelineJob(
-        options.getProject(), jobResult.getId(), options, aggregatorTransforms);
+    DataflowPipelineJob dataflowPipelineJob =
+        new DataflowPipelineJob(jobResult.getId(), options, aggregatorTransforms);
 
     // If the service returned client request id, the SDK needs to compare it
     // with the original id generated in the request, if they are not the same

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
index 2937184..1a44963 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
@@ -30,7 +30,7 @@ public class DataflowTemplateJob extends DataflowPipelineJob {
       "The result of template creation should not be used.";
 
   public DataflowTemplateJob() {
-    super(null, null, null, null);
+    super(null, null, null);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 0527b7c..323f762 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -115,6 +115,7 @@ public class DataflowPipelineJobTest {
 
     options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
     options.setDataflowClient(mockWorkflowClient);
+    options.setProject(PROJECT_ID);
   }
 
   /**
@@ -160,8 +161,8 @@ public class DataflowPipelineJobTest {
     DataflowAggregatorTransforms dataflowAggregatorTransforms =
         mock(DataflowAggregatorTransforms.class);
 
-    DataflowPipelineJob job = new DataflowPipelineJob(
-        PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
 
     State state = job.waitUntilFinish(
         Duration.standardMinutes(5), jobHandler, fastClock, fastClock);
@@ -182,8 +183,8 @@ public class DataflowPipelineJobTest {
     DataflowAggregatorTransforms dataflowAggregatorTransforms =
         mock(DataflowAggregatorTransforms.class);
 
-    DataflowPipelineJob job = new DataflowPipelineJob(
-        PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
 
     return job.waitUntilFinish(Duration.standardMinutes(1), null, fastClock, fastClock);
   }
@@ -249,8 +250,8 @@ public class DataflowPipelineJobTest {
     DataflowAggregatorTransforms dataflowAggregatorTransforms =
         mock(DataflowAggregatorTransforms.class);
 
-    DataflowPipelineJob job = new DataflowPipelineJob(
-        PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
 
     long startTime = fastClock.nanoTime();
     State state = job.waitUntilFinish(Duration.standardMinutes(5), null, fastClock, fastClock);
@@ -269,8 +270,8 @@ public class DataflowPipelineJobTest {
     DataflowAggregatorTransforms dataflowAggregatorTransforms =
         mock(DataflowAggregatorTransforms.class);
 
-    DataflowPipelineJob job = new DataflowPipelineJob(
-        PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
     long startTime = fastClock.nanoTime();
     State state = job.waitUntilFinish(Duration.millis(4), null, fastClock, fastClock);
     assertEquals(null, state);
@@ -294,7 +295,7 @@ public class DataflowPipelineJobTest {
     FastNanoClockAndFuzzySleeper clock = new FastNanoClockAndFuzzySleeper();
 
     DataflowPipelineJob job = new DataflowPipelineJob(
-        PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+        JOB_ID, options, dataflowAggregatorTransforms);
     long startTime = clock.nanoTime();
     State state = job.waitUntilFinish(Duration.millis(4), null, clock, clock);
     assertEquals(null, state);
@@ -317,7 +318,7 @@ public class DataflowPipelineJobTest {
         mock(DataflowAggregatorTransforms.class);
 
     DataflowPipelineJob job = new DataflowPipelineJob(
-        PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+        JOB_ID, options, dataflowAggregatorTransforms);
 
     assertEquals(
         State.RUNNING,
@@ -333,8 +334,8 @@ public class DataflowPipelineJobTest {
     DataflowAggregatorTransforms dataflowAggregatorTransforms =
         mock(DataflowAggregatorTransforms.class);
 
-    DataflowPipelineJob job = new DataflowPipelineJob(
-        PROJECT_ID, JOB_ID, options, dataflowAggregatorTransforms);
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(JOB_ID, options, dataflowAggregatorTransforms);
 
     long startTime = fastClock.nanoTime();
     assertEquals(
@@ -373,7 +374,7 @@ public class DataflowPipelineJobTest {
     modelJob.setCurrentState(State.RUNNING.toString());
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+        new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
 
     AggregatorValues<?> values = job.getAggregatorValues(aggregator);
 
@@ -408,7 +409,7 @@ public class DataflowPipelineJobTest {
     modelJob.setCurrentState(State.RUNNING.toString());
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+        new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
 
     AggregatorValues<?> values = job.getAggregatorValues(aggregator);
 
@@ -453,8 +454,7 @@ public class DataflowPipelineJobTest {
     when(getState.execute()).thenReturn(modelJob);
     modelJob.setCurrentState(State.RUNNING.toString());
 
-    DataflowPipelineJob job =
-        new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+    DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
 
     AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
 
@@ -521,8 +521,7 @@ public class DataflowPipelineJobTest {
     when(getState.execute()).thenReturn(modelJob);
     modelJob.setCurrentState(State.RUNNING.toString());
 
-    DataflowPipelineJob job =
-        new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+    DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
 
     AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
 
@@ -571,7 +570,7 @@ public class DataflowPipelineJobTest {
     modelJob.setCurrentState(State.RUNNING.toString());
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+        new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
 
     AggregatorValues<Long> values = job.getAggregatorValues(aggregator);
 
@@ -589,7 +588,7 @@ public class DataflowPipelineJobTest {
         ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+        new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("not used in this pipeline");
@@ -624,8 +623,7 @@ public class DataflowPipelineJobTest {
     when(getState.execute()).thenReturn(modelJob);
     modelJob.setCurrentState(State.RUNNING.toString());
 
-    DataflowPipelineJob job =
-        new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, aggregatorTransforms);
+    DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, aggregatorTransforms);
 
     thrown.expect(AggregatorRetrievalException.class);
     thrown.expectCause(is(cause));
@@ -690,7 +688,7 @@ public class DataflowPipelineJobTest {
     when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update);
     when(update.execute()).thenReturn(new Job());
 
-    DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, null);
+    DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
 
     assertEquals(State.CANCELLED, job.cancel());
     Job content = new Job();
@@ -714,7 +712,7 @@ public class DataflowPipelineJobTest {
     when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update);
     when(update.execute()).thenThrow(new IOException());
 
-    DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, null);
+    DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
 
     thrown.expect(IOException.class);
     thrown.expectMessage("Failed to cancel the job, "
@@ -742,7 +740,7 @@ public class DataflowPipelineJobTest {
     when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update);
     when(update.execute()).thenThrow(new IOException());
 
-    DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, options, null);
+    DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
 
     assertEquals(State.FAILED, job.cancel());
     Job content = new Job();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ded58832/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
index e6b513a..366c6a1 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -344,8 +344,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -359,8 +358,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testCheckingForSuccessWhenPAssertFails() throws Exception {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -374,8 +372,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -389,8 +386,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testCheckMaxWatermarkWithNoWatermarkMetric() throws IOException {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
@@ -403,8 +399,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testCheckMaxWatermarkWithSingleWatermarkAtMax() throws IOException {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
@@ -417,8 +412,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testCheckMaxWatermarkWithSingleWatermarkNotAtMax() throws IOException {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
@@ -431,8 +425,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testCheckMaxWatermarkWithMultipleWatermarksAtMax() throws IOException {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
@@ -446,8 +439,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testCheckMaxWatermarkWithMultipleMaxAndNotMaxWatermarks() throws IOException {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
@@ -461,8 +453,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testCheckMaxWatermarkIgnoresUnrelatedMatrics() throws IOException {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
@@ -476,8 +467,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testStreamingPipelineFailsIfServiceFails() throws Exception {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -532,8 +522,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testGetJobMetricsThatSucceeds() throws Exception {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
@@ -549,8 +538,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testGetJobMetricsThatFailsForException() throws Exception {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));