You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/01 18:54:12 UTC

[1/2] beam git commit: Do not repeat log messages in DataflowPipelineJob

Repository: beam
Updated Branches:
  refs/heads/master 87236ce2f -> 3fff52d43


Do not repeat log messages in DataflowPipelineJob


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/401eef09
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/401eef09
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/401eef09

Branch: refs/heads/master
Commit: 401eef09a0dc94dc2ed1e4afcaa30259d922ba3f
Parents: 9f2733a
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Apr 30 21:24:23 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Apr 30 21:24:23 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/DataflowPipelineJob.java   |  37 +++++-
 .../beam/runners/dataflow/DataflowRunner.java   |   6 +-
 .../dataflow/util/DataflowTemplateJob.java      |   2 +-
 .../runners/dataflow/util/MonitoringUtil.java   |   4 +-
 .../dataflow/DataflowPipelineJobTest.java       | 127 +++++++++++++++++--
 .../testing/TestDataflowRunnerTest.java         |  14 +-
 6 files changed, 162 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/401eef09/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index d464206..aef3155 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -100,6 +100,11 @@ public class DataflowPipelineJob implements PipelineResult {
   private List<MetricUpdate> terminalMetricUpdates;
 
   /**
+   * The latest timestamp up to which job messages have been retrieved.
+   */
+  private long lastTimestamp = Long.MIN_VALUE;
+
+  /**
    * The polling interval for job status and messages information.
    */
   static final Duration MESSAGES_POLLING_INTERVAL = Duration.standardSeconds(2);
@@ -132,12 +137,13 @@ public class DataflowPipelineJob implements PipelineResult {
    * @param transformStepNames a mapping from AppliedPTransforms to Step Names
    */
   public DataflowPipelineJob(
+      DataflowClient dataflowClient,
       String jobId,
       DataflowPipelineOptions dataflowOptions,
       Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) {
+    this.dataflowClient = dataflowClient;
     this.jobId = jobId;
     this.dataflowOptions = dataflowOptions;
-    this.dataflowClient = (dataflowOptions == null ? null : DataflowClient.create(dataflowOptions));
     this.transformStepNames = HashBiMap.create(
         firstNonNull(transformStepNames, ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of()));
     this.dataflowMetrics = new DataflowMetrics(this, this.dataflowClient);
@@ -183,7 +189,8 @@ public class DataflowPipelineJob implements PipelineResult {
   @Nullable
   public State waitUntilFinish(Duration duration) {
     try {
-      return waitUntilFinish(duration, new MonitoringUtil.LoggingHandler());
+      return waitUntilFinish(
+          duration, new MonitoringUtil.LoggingHandler());
     } catch (Exception e) {
       if (e instanceof InterruptedException) {
         Thread.currentThread().interrupt();
@@ -229,12 +236,29 @@ public class DataflowPipelineJob implements PipelineResult {
 
     try {
       Runtime.getRuntime().addShutdownHook(shutdownHook);
-      return waitUntilFinish(duration, messageHandler, Sleeper.DEFAULT, NanoClock.SYSTEM);
+      return waitUntilFinish(
+          duration,
+          messageHandler,
+          Sleeper.DEFAULT,
+          NanoClock.SYSTEM,
+          new MonitoringUtil(dataflowClient));
     } finally {
       Runtime.getRuntime().removeShutdownHook(shutdownHook);
     }
   }
 
+  @Nullable
+  @VisibleForTesting
+  State waitUntilFinish(
+      Duration duration,
+      @Nullable MonitoringUtil.JobMessagesHandler messageHandler,
+      Sleeper sleeper,
+      NanoClock nanoClock)
+      throws IOException, InterruptedException {
+    return waitUntilFinish(
+        duration, messageHandler, sleeper, nanoClock, new MonitoringUtil(dataflowClient));
+  }
+
   /**
    * Waits until the pipeline finishes and returns the final status.
    *
@@ -256,10 +280,9 @@ public class DataflowPipelineJob implements PipelineResult {
       Duration duration,
       @Nullable MonitoringUtil.JobMessagesHandler messageHandler,
       Sleeper sleeper,
-      NanoClock nanoClock) throws IOException, InterruptedException {
-    MonitoringUtil monitor = new MonitoringUtil(dataflowClient);
+      NanoClock nanoClock,
+      MonitoringUtil monitor) throws IOException, InterruptedException {
 
-    long lastTimestamp = 0;
     BackOff backoff;
     if (!duration.isLongerThan(Duration.ZERO)) {
       backoff = MESSAGES_BACKOFF_FACTORY.backoff();
@@ -460,7 +483,7 @@ public class DataflowPipelineJob implements PipelineResult {
         if (currentState.isTerminal()) {
           terminalState = currentState;
           replacedByJob = new DataflowPipelineJob(
-              job.getReplacedByJobId(), dataflowOptions, transformStepNames);
+              dataflowClient, job.getReplacedByJobId(), dataflowOptions, transformStepNames);
         }
         return job;
       } catch (IOException exn) {

http://git-wip-us.apache.org/repos/asf/beam/blob/401eef09/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index a61fe49..12d10de 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -621,7 +621,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     // Use a raw client for post-launch monitoring, as status calls may fail
     // regularly and need not be retried automatically.
     DataflowPipelineJob dataflowPipelineJob =
-        new DataflowPipelineJob(jobResult.getId(), options, jobSpecification.getStepNames());
+        new DataflowPipelineJob(
+            DataflowClient.create(options),
+            jobResult.getId(),
+            options,
+            jobSpecification.getStepNames());
 
     // If the service returned client request id, the SDK needs to compare it
     // with the original id generated in the request, if they are not the same

http://git-wip-us.apache.org/repos/asf/beam/blob/401eef09/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
index 1a44963..2937184 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
@@ -30,7 +30,7 @@ public class DataflowTemplateJob extends DataflowPipelineJob {
       "The result of template creation should not be used.";
 
   public DataflowTemplateJob() {
-    super(null, null, null);
+    super(null, null, null, null);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/401eef09/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
index c410afb..759387c 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
@@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory;
 /**
  * A helper class for monitoring jobs submitted to the service.
  */
-public final class MonitoringUtil {
+public class MonitoringUtil {
 
   private static final String GCLOUD_DATAFLOW_PREFIX = "gcloud beta dataflow";
   private static final String ENDPOINT_OVERRIDE_ENV_VAR =
@@ -147,7 +147,7 @@ public final class MonitoringUtil {
    *   timestamp greater than this value.
    * @return collection of messages
    */
-  public ArrayList<JobMessage> getJobMessages(
+  public List<JobMessage> getJobMessages(
       String jobId, long startTimestampMs) throws IOException {
     // TODO: Allow filtering messages by importance
     Instant startTimestamp = new Instant(startTimestampMs);

http://git-wip-us.apache.org/repos/asf/beam/blob/401eef09/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index e1235b9..237493a 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -23,6 +23,8 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
@@ -35,14 +37,20 @@ import com.google.api.client.util.Sleeper;
 import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.Messages;
 import com.google.api.services.dataflow.model.Job;
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.util.Collections;
+import java.util.List;
+import java.util.NavigableMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.runners.dataflow.util.TimeUtil;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
@@ -57,6 +65,7 @@ import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -77,6 +86,8 @@ public class DataflowPipelineJobTest {
   private static final String REPLACEMENT_JOB_ID = "4321";
 
   @Mock
+  private DataflowClient mockDataflowClient;
+  @Mock
   private Dataflow mockWorkflowClient;
   @Mock
   private Dataflow.Projects mockProjects;
@@ -84,6 +95,8 @@ public class DataflowPipelineJobTest {
   private Dataflow.Projects.Locations mockLocations;
   @Mock
   private Dataflow.Projects.Locations.Jobs mockJobs;
+  @Mock
+  private MonitoringUtil.JobMessagesHandler mockHandler;
   @Rule
   public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
 
@@ -157,7 +170,10 @@ public class DataflowPipelineJobTest {
     when(listRequest.execute()).thenThrow(SocketTimeoutException.class);
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(JOB_ID, options,
+        new DataflowPipelineJob(
+            DataflowClient.create(options),
+            JOB_ID,
+            options,
             ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
 
     State state = job.waitUntilFinish(
@@ -179,7 +195,10 @@ public class DataflowPipelineJobTest {
     when(statusRequest.execute()).thenReturn(statusResponse);
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(JOB_ID, options,
+        new DataflowPipelineJob(
+            DataflowClient.create(options),
+            JOB_ID,
+            options,
             ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
 
     return job.waitUntilFinish(Duration.standardMinutes(1), null, fastClock, fastClock);
@@ -246,7 +265,10 @@ public class DataflowPipelineJobTest {
     when(statusRequest.execute()).thenThrow(IOException.class);
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(JOB_ID, options,
+        new DataflowPipelineJob(
+            DataflowClient.create(options),
+            JOB_ID,
+            options,
             ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
 
     long startTime = fastClock.nanoTime();
@@ -266,7 +288,10 @@ public class DataflowPipelineJobTest {
     when(statusRequest.execute()).thenThrow(IOException.class);
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(JOB_ID, options,
+        new DataflowPipelineJob(
+            DataflowClient.create(options),
+            JOB_ID,
+            options,
             ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
     long startTime = fastClock.nanoTime();
     State state = job.waitUntilFinish(Duration.millis(4), null, fastClock, fastClock);
@@ -289,7 +314,10 @@ public class DataflowPipelineJobTest {
     FastNanoClockAndFuzzySleeper clock = new FastNanoClockAndFuzzySleeper();
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(JOB_ID, options,
+        new DataflowPipelineJob(
+            DataflowClient.create(options),
+            JOB_ID,
+            options,
             ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
     long startTime = clock.nanoTime();
     State state = job.waitUntilFinish(Duration.millis(4), null, clock, clock);
@@ -311,7 +339,10 @@ public class DataflowPipelineJobTest {
     when(statusRequest.execute()).thenReturn(statusResponse);
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(JOB_ID, options,
+        new DataflowPipelineJob(
+            DataflowClient.create(options),
+            JOB_ID,
+            options,
             ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
 
     assertEquals(
@@ -328,7 +359,10 @@ public class DataflowPipelineJobTest {
     when(statusRequest.execute()).thenThrow(IOException.class);
 
     DataflowPipelineJob job =
-        new DataflowPipelineJob(JOB_ID, options,
+        new DataflowPipelineJob(
+            DataflowClient.create(options),
+            JOB_ID,
+            options,
             ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
 
     long startTime = fastClock.nanoTime();
@@ -379,7 +413,8 @@ public class DataflowPipelineJobTest {
         .thenReturn(update);
     when(update.execute()).thenReturn(new Job().setCurrentState("JOB_STATE_CANCELLED"));
 
-    DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, null);
 
     assertEquals(State.CANCELLED, job.cancel());
     Job content = new Job();
@@ -406,7 +441,8 @@ public class DataflowPipelineJobTest {
         .thenReturn(update);
     when(update.execute()).thenThrow(new IOException("Some random IOException"));
 
-    DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, null);
 
     thrown.expect(IOException.class);
     thrown.expectMessage("Failed to cancel job in state RUNNING, "
@@ -436,7 +472,8 @@ public class DataflowPipelineJobTest {
         .thenReturn(update);
     when(update.execute()).thenThrow(new IOException("Job has terminated in state SUCCESS"));
 
-    DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, null);
     State returned = job.cancel();
     assertThat(returned, equalTo(State.RUNNING));
     expectedLogs.verifyWarn("Cancel failed because job is already terminated.");
@@ -458,7 +495,8 @@ public class DataflowPipelineJobTest {
         .thenReturn(update);
     when(update.execute()).thenThrow(new IOException());
 
-    DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
+    DataflowPipelineJob job =
+        new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, null);
 
     assertEquals(State.FAILED, job.cancel());
     Job content = new Job();
@@ -469,4 +507,71 @@ public class DataflowPipelineJobTest {
     verify(mockJobs).get(PROJECT_ID, REGION_ID, JOB_ID);
     verifyNoMoreInteractions(mockJobs);
   }
+
+  /**
+   * Tests that a {@link DataflowPipelineJob} does not duplicate messages.
+   */
+  @Test
+  public void testWaitUntilFinishNoRepeatedLogs() throws Exception {
+    DataflowPipelineJob job = new DataflowPipelineJob(mockDataflowClient, JOB_ID, options, null);
+    Sleeper sleeper = new ZeroSleeper();
+    NanoClock nanoClock = mock(NanoClock.class);
+
+    Instant separatingTimestamp = new Instant(42L);
+    JobMessage theMessage = infoMessage(separatingTimestamp, "nothing");
+
+    MonitoringUtil mockMonitor = mock(MonitoringUtil.class);
+    when(mockMonitor.getJobMessages(anyString(), anyLong()))
+        .thenReturn(ImmutableList.of(theMessage));
+
+    // The Job just always reports "running" across all calls
+    Job fakeJob = new Job();
+    fakeJob.setCurrentState("JOB_STATE_RUNNING");
+    when(mockDataflowClient.getJob(anyString())).thenReturn(fakeJob);
+
+    // After waitUntilFinish the DataflowPipelineJob should record the latest message timestamp
+    when(nanoClock.nanoTime()).thenReturn(0L).thenReturn(2000000000L);
+    job.waitUntilFinish(Duration.standardSeconds(1), mockHandler, sleeper, nanoClock, mockMonitor);
+    verify(mockHandler).process(ImmutableList.of(theMessage));
+
+    // Second waitUntilFinish should request jobs with `separatingTimestamp` so the monitor
+    // will only return new messages
+    when(nanoClock.nanoTime()).thenReturn(3000000000L).thenReturn(6000000000L);
+    job.waitUntilFinish(Duration.standardSeconds(1), mockHandler, sleeper, nanoClock, mockMonitor);
+    verify(mockMonitor).getJobMessages(anyString(), eq(separatingTimestamp.getMillis()));
+  }
+
+  private static JobMessage infoMessage(Instant timestamp, String text) {
+    JobMessage message = new JobMessage();
+    message.setTime(TimeUtil.toCloudTime(timestamp));
+    message.setMessageText(text);
+    return message;
+  }
+
+  private class FakeMonitor extends MonitoringUtil {
+    // Messages in timestamp order
+    private final NavigableMap<Long, JobMessage> timestampedMessages;
+
+    public FakeMonitor(JobMessage... messages) {
+      // The client should never be used; this Fake is intended to intercept relevant methods
+      super(mockDataflowClient);
+
+      NavigableMap<Long, JobMessage> timestampedMessages = Maps.newTreeMap();
+      for (JobMessage message : messages) {
+        timestampedMessages.put(Long.parseLong(message.getTime()), message);
+      }
+
+      this.timestampedMessages = timestampedMessages;
+    }
+
+    @Override
+    public List<JobMessage> getJobMessages(String jobId, long startTimestampMs) {
+      return ImmutableList.copyOf(timestampedMessages.headMap(startTimestampMs).values());
+    }
+  }
+
+  private static class ZeroSleeper implements Sleeper {
+    @Override
+    public void sleep(long l) throws InterruptedException {}
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/401eef09/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
index 80fbfe5..54eb88d 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -307,7 +307,8 @@ public class TestDataflowRunnerTest {
    */
   @Test
   public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
-    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -326,7 +327,8 @@ public class TestDataflowRunnerTest {
    */
   @Test
   public void testCheckingForSuccessWhenPAssertFails() throws Exception {
-    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -342,7 +344,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
-    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -363,7 +365,7 @@ public class TestDataflowRunnerTest {
    */
   @Test
   public void testStreamingPipelineFailsIfServiceFails() throws Exception {
-    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -422,7 +424,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testGetJobMetricsThatSucceeds() throws Exception {
-    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
@@ -438,7 +440,7 @@ public class TestDataflowRunnerTest {
 
   @Test
   public void testGetJobMetricsThatFailsForException() throws Exception {
-    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
+    DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 


[2/2] beam git commit: This closes #2796: Do not repeat log messages in DataflowPipelineJob

Posted by ke...@apache.org.
This closes #2796: Do not repeat log messages in DataflowPipelineJob


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3fff52d4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3fff52d4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3fff52d4

Branch: refs/heads/master
Commit: 3fff52d43cbdc3b34f7d3e2ed7ac62f5ff06ccb6
Parents: 87236ce 401eef0
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 1 11:53:48 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon May 1 11:53:48 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/DataflowPipelineJob.java   |  37 +++++-
 .../beam/runners/dataflow/DataflowRunner.java   |   6 +-
 .../dataflow/util/DataflowTemplateJob.java      |   2 +-
 .../runners/dataflow/util/MonitoringUtil.java   |   4 +-
 .../dataflow/DataflowPipelineJobTest.java       | 127 +++++++++++++++++--
 .../testing/TestDataflowRunnerTest.java         |  14 +-
 6 files changed, 162 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3fff52d4/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------