You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/01 18:54:12 UTC
[1/2] beam git commit: Do not repeat log messages in
DataflowPipelineJob
Repository: beam
Updated Branches:
refs/heads/master 87236ce2f -> 3fff52d43
Do not repeat log messages in DataflowPipelineJob
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/401eef09
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/401eef09
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/401eef09
Branch: refs/heads/master
Commit: 401eef09a0dc94dc2ed1e4afcaa30259d922ba3f
Parents: 9f2733a
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Apr 30 21:24:23 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Apr 30 21:24:23 2017 -0700
----------------------------------------------------------------------
.../runners/dataflow/DataflowPipelineJob.java | 37 +++++-
.../beam/runners/dataflow/DataflowRunner.java | 6 +-
.../dataflow/util/DataflowTemplateJob.java | 2 +-
.../runners/dataflow/util/MonitoringUtil.java | 4 +-
.../dataflow/DataflowPipelineJobTest.java | 127 +++++++++++++++++--
.../testing/TestDataflowRunnerTest.java | 14 +-
6 files changed, 162 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/401eef09/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index d464206..aef3155 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -100,6 +100,11 @@ public class DataflowPipelineJob implements PipelineResult {
private List<MetricUpdate> terminalMetricUpdates;
/**
+ * The latest timestamp up to which job messages have been retrieved.
+ */
+ private long lastTimestamp = Long.MIN_VALUE;
+
+ /**
* The polling interval for job status and messages information.
*/
static final Duration MESSAGES_POLLING_INTERVAL = Duration.standardSeconds(2);
@@ -132,12 +137,13 @@ public class DataflowPipelineJob implements PipelineResult {
* @param transformStepNames a mapping from AppliedPTransforms to Step Names
*/
public DataflowPipelineJob(
+ DataflowClient dataflowClient,
String jobId,
DataflowPipelineOptions dataflowOptions,
Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) {
+ this.dataflowClient = dataflowClient;
this.jobId = jobId;
this.dataflowOptions = dataflowOptions;
- this.dataflowClient = (dataflowOptions == null ? null : DataflowClient.create(dataflowOptions));
this.transformStepNames = HashBiMap.create(
firstNonNull(transformStepNames, ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of()));
this.dataflowMetrics = new DataflowMetrics(this, this.dataflowClient);
@@ -183,7 +189,8 @@ public class DataflowPipelineJob implements PipelineResult {
@Nullable
public State waitUntilFinish(Duration duration) {
try {
- return waitUntilFinish(duration, new MonitoringUtil.LoggingHandler());
+ return waitUntilFinish(
+ duration, new MonitoringUtil.LoggingHandler());
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
@@ -229,12 +236,29 @@ public class DataflowPipelineJob implements PipelineResult {
try {
Runtime.getRuntime().addShutdownHook(shutdownHook);
- return waitUntilFinish(duration, messageHandler, Sleeper.DEFAULT, NanoClock.SYSTEM);
+ return waitUntilFinish(
+ duration,
+ messageHandler,
+ Sleeper.DEFAULT,
+ NanoClock.SYSTEM,
+ new MonitoringUtil(dataflowClient));
} finally {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
}
}
+ @Nullable
+ @VisibleForTesting
+ State waitUntilFinish(
+ Duration duration,
+ @Nullable MonitoringUtil.JobMessagesHandler messageHandler,
+ Sleeper sleeper,
+ NanoClock nanoClock)
+ throws IOException, InterruptedException {
+ return waitUntilFinish(
+ duration, messageHandler, sleeper, nanoClock, new MonitoringUtil(dataflowClient));
+ }
+
/**
* Waits until the pipeline finishes and returns the final status.
*
@@ -256,10 +280,9 @@ public class DataflowPipelineJob implements PipelineResult {
Duration duration,
@Nullable MonitoringUtil.JobMessagesHandler messageHandler,
Sleeper sleeper,
- NanoClock nanoClock) throws IOException, InterruptedException {
- MonitoringUtil monitor = new MonitoringUtil(dataflowClient);
+ NanoClock nanoClock,
+ MonitoringUtil monitor) throws IOException, InterruptedException {
- long lastTimestamp = 0;
BackOff backoff;
if (!duration.isLongerThan(Duration.ZERO)) {
backoff = MESSAGES_BACKOFF_FACTORY.backoff();
@@ -460,7 +483,7 @@ public class DataflowPipelineJob implements PipelineResult {
if (currentState.isTerminal()) {
terminalState = currentState;
replacedByJob = new DataflowPipelineJob(
- job.getReplacedByJobId(), dataflowOptions, transformStepNames);
+ dataflowClient, job.getReplacedByJobId(), dataflowOptions, transformStepNames);
}
return job;
} catch (IOException exn) {
http://git-wip-us.apache.org/repos/asf/beam/blob/401eef09/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index a61fe49..12d10de 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -621,7 +621,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// Use a raw client for post-launch monitoring, as status calls may fail
// regularly and need not be retried automatically.
DataflowPipelineJob dataflowPipelineJob =
- new DataflowPipelineJob(jobResult.getId(), options, jobSpecification.getStepNames());
+ new DataflowPipelineJob(
+ DataflowClient.create(options),
+ jobResult.getId(),
+ options,
+ jobSpecification.getStepNames());
// If the service returned client request id, the SDK needs to compare it
// with the original id generated in the request, if they are not the same
http://git-wip-us.apache.org/repos/asf/beam/blob/401eef09/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
index 1a44963..2937184 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
@@ -30,7 +30,7 @@ public class DataflowTemplateJob extends DataflowPipelineJob {
"The result of template creation should not be used.";
public DataflowTemplateJob() {
- super(null, null, null);
+ super(null, null, null, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/401eef09/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
index c410afb..759387c 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
@@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory;
/**
* A helper class for monitoring jobs submitted to the service.
*/
-public final class MonitoringUtil {
+public class MonitoringUtil {
private static final String GCLOUD_DATAFLOW_PREFIX = "gcloud beta dataflow";
private static final String ENDPOINT_OVERRIDE_ENV_VAR =
@@ -147,7 +147,7 @@ public final class MonitoringUtil {
* timestamp greater than this value.
* @return collection of messages
*/
- public ArrayList<JobMessage> getJobMessages(
+ public List<JobMessage> getJobMessages(
String jobId, long startTimestampMs) throws IOException {
// TODO: Allow filtering messages by importance
Instant startTimestamp = new Instant(startTimestampMs);
http://git-wip-us.apache.org/repos/asf/beam/blob/401eef09/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index e1235b9..237493a 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -23,6 +23,8 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
@@ -35,14 +37,20 @@ import com.google.api.client.util.Sleeper;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.Messages;
import com.google.api.services.dataflow.model.Job;
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Collections;
+import java.util.List;
+import java.util.NavigableMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.runners.dataflow.util.TimeUtil;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
@@ -57,6 +65,7 @@ import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Duration;
+import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -77,6 +86,8 @@ public class DataflowPipelineJobTest {
private static final String REPLACEMENT_JOB_ID = "4321";
@Mock
+ private DataflowClient mockDataflowClient;
+ @Mock
private Dataflow mockWorkflowClient;
@Mock
private Dataflow.Projects mockProjects;
@@ -84,6 +95,8 @@ public class DataflowPipelineJobTest {
private Dataflow.Projects.Locations mockLocations;
@Mock
private Dataflow.Projects.Locations.Jobs mockJobs;
+ @Mock
+ private MonitoringUtil.JobMessagesHandler mockHandler;
@Rule
public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
@@ -157,7 +170,10 @@ public class DataflowPipelineJobTest {
when(listRequest.execute()).thenThrow(SocketTimeoutException.class);
DataflowPipelineJob job =
- new DataflowPipelineJob(JOB_ID, options,
+ new DataflowPipelineJob(
+ DataflowClient.create(options),
+ JOB_ID,
+ options,
ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
State state = job.waitUntilFinish(
@@ -179,7 +195,10 @@ public class DataflowPipelineJobTest {
when(statusRequest.execute()).thenReturn(statusResponse);
DataflowPipelineJob job =
- new DataflowPipelineJob(JOB_ID, options,
+ new DataflowPipelineJob(
+ DataflowClient.create(options),
+ JOB_ID,
+ options,
ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
return job.waitUntilFinish(Duration.standardMinutes(1), null, fastClock, fastClock);
@@ -246,7 +265,10 @@ public class DataflowPipelineJobTest {
when(statusRequest.execute()).thenThrow(IOException.class);
DataflowPipelineJob job =
- new DataflowPipelineJob(JOB_ID, options,
+ new DataflowPipelineJob(
+ DataflowClient.create(options),
+ JOB_ID,
+ options,
ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
long startTime = fastClock.nanoTime();
@@ -266,7 +288,10 @@ public class DataflowPipelineJobTest {
when(statusRequest.execute()).thenThrow(IOException.class);
DataflowPipelineJob job =
- new DataflowPipelineJob(JOB_ID, options,
+ new DataflowPipelineJob(
+ DataflowClient.create(options),
+ JOB_ID,
+ options,
ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
long startTime = fastClock.nanoTime();
State state = job.waitUntilFinish(Duration.millis(4), null, fastClock, fastClock);
@@ -289,7 +314,10 @@ public class DataflowPipelineJobTest {
FastNanoClockAndFuzzySleeper clock = new FastNanoClockAndFuzzySleeper();
DataflowPipelineJob job =
- new DataflowPipelineJob(JOB_ID, options,
+ new DataflowPipelineJob(
+ DataflowClient.create(options),
+ JOB_ID,
+ options,
ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
long startTime = clock.nanoTime();
State state = job.waitUntilFinish(Duration.millis(4), null, clock, clock);
@@ -311,7 +339,10 @@ public class DataflowPipelineJobTest {
when(statusRequest.execute()).thenReturn(statusResponse);
DataflowPipelineJob job =
- new DataflowPipelineJob(JOB_ID, options,
+ new DataflowPipelineJob(
+ DataflowClient.create(options),
+ JOB_ID,
+ options,
ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
assertEquals(
@@ -328,7 +359,10 @@ public class DataflowPipelineJobTest {
when(statusRequest.execute()).thenThrow(IOException.class);
DataflowPipelineJob job =
- new DataflowPipelineJob(JOB_ID, options,
+ new DataflowPipelineJob(
+ DataflowClient.create(options),
+ JOB_ID,
+ options,
ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of());
long startTime = fastClock.nanoTime();
@@ -379,7 +413,8 @@ public class DataflowPipelineJobTest {
.thenReturn(update);
when(update.execute()).thenReturn(new Job().setCurrentState("JOB_STATE_CANCELLED"));
- DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
+ DataflowPipelineJob job =
+ new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, null);
assertEquals(State.CANCELLED, job.cancel());
Job content = new Job();
@@ -406,7 +441,8 @@ public class DataflowPipelineJobTest {
.thenReturn(update);
when(update.execute()).thenThrow(new IOException("Some random IOException"));
- DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
+ DataflowPipelineJob job =
+ new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, null);
thrown.expect(IOException.class);
thrown.expectMessage("Failed to cancel job in state RUNNING, "
@@ -436,7 +472,8 @@ public class DataflowPipelineJobTest {
.thenReturn(update);
when(update.execute()).thenThrow(new IOException("Job has terminated in state SUCCESS"));
- DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
+ DataflowPipelineJob job =
+ new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, null);
State returned = job.cancel();
assertThat(returned, equalTo(State.RUNNING));
expectedLogs.verifyWarn("Cancel failed because job is already terminated.");
@@ -458,7 +495,8 @@ public class DataflowPipelineJobTest {
.thenReturn(update);
when(update.execute()).thenThrow(new IOException());
- DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
+ DataflowPipelineJob job =
+ new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, null);
assertEquals(State.FAILED, job.cancel());
Job content = new Job();
@@ -469,4 +507,71 @@ public class DataflowPipelineJobTest {
verify(mockJobs).get(PROJECT_ID, REGION_ID, JOB_ID);
verifyNoMoreInteractions(mockJobs);
}
+
+ /**
+ * Tests that a {@link DataflowPipelineJob} does not duplicate messages.
+ */
+ @Test
+ public void testWaitUntilFinishNoRepeatedLogs() throws Exception {
+ DataflowPipelineJob job = new DataflowPipelineJob(mockDataflowClient, JOB_ID, options, null);
+ Sleeper sleeper = new ZeroSleeper();
+ NanoClock nanoClock = mock(NanoClock.class);
+
+ Instant separatingTimestamp = new Instant(42L);
+ JobMessage theMessage = infoMessage(separatingTimestamp, "nothing");
+
+ MonitoringUtil mockMonitor = mock(MonitoringUtil.class);
+ when(mockMonitor.getJobMessages(anyString(), anyLong()))
+ .thenReturn(ImmutableList.of(theMessage));
+
+ // The Job just always reports "running" across all calls
+ Job fakeJob = new Job();
+ fakeJob.setCurrentState("JOB_STATE_RUNNING");
+ when(mockDataflowClient.getJob(anyString())).thenReturn(fakeJob);
+
+ // After waitUntilFinish the DataflowPipelineJob should record the latest message timestamp
+ when(nanoClock.nanoTime()).thenReturn(0L).thenReturn(2000000000L);
+ job.waitUntilFinish(Duration.standardSeconds(1), mockHandler, sleeper, nanoClock, mockMonitor);
+ verify(mockHandler).process(ImmutableList.of(theMessage));
+
+ // Second waitUntilFinish should request jobs with `separatingTimestamp` so the monitor
+ // will only return new messages
+ when(nanoClock.nanoTime()).thenReturn(3000000000L).thenReturn(6000000000L);
+ job.waitUntilFinish(Duration.standardSeconds(1), mockHandler, sleeper, nanoClock, mockMonitor);
+ verify(mockMonitor).getJobMessages(anyString(), eq(separatingTimestamp.getMillis()));
+ }
+
+ private static JobMessage infoMessage(Instant timestamp, String text) {
+ JobMessage message = new JobMessage();
+ message.setTime(TimeUtil.toCloudTime(timestamp));
+ message.setMessageText(text);
+ return message;
+ }
+
+ private class FakeMonitor extends MonitoringUtil {
+ // Messages in timestamp order
+ private final NavigableMap<Long, JobMessage> timestampedMessages;
+
+ public FakeMonitor(JobMessage... messages) {
+ // The client should never be used; this Fake is intended to intercept relevant methods
+ super(mockDataflowClient);
+
+ NavigableMap<Long, JobMessage> timestampedMessages = Maps.newTreeMap();
+ for (JobMessage message : messages) {
+ timestampedMessages.put(Long.parseLong(message.getTime()), message);
+ }
+
+ this.timestampedMessages = timestampedMessages;
+ }
+
+ @Override
+ public List<JobMessage> getJobMessages(String jobId, long startTimestampMs) {
+ return ImmutableList.copyOf(timestampedMessages.headMap(startTimestampMs).values());
+ }
+ }
+
+ private static class ZeroSleeper implements Sleeper {
+ @Override
+ public void sleep(long l) throws InterruptedException {}
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/401eef09/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
index 80fbfe5..54eb88d 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -307,7 +307,8 @@ public class TestDataflowRunnerTest {
*/
@Test
public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
- DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
+ DataflowPipelineJob job =
+ spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -326,7 +327,8 @@ public class TestDataflowRunnerTest {
*/
@Test
public void testCheckingForSuccessWhenPAssertFails() throws Exception {
- DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
+ DataflowPipelineJob job =
+ spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -342,7 +344,7 @@ public class TestDataflowRunnerTest {
@Test
public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
- DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -363,7 +365,7 @@ public class TestDataflowRunnerTest {
*/
@Test
public void testStreamingPipelineFailsIfServiceFails() throws Exception {
- DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);
@@ -422,7 +424,7 @@ public class TestDataflowRunnerTest {
@Test
public void testGetJobMetricsThatSucceeds() throws Exception {
- DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
@@ -438,7 +440,7 @@ public class TestDataflowRunnerTest {
@Test
public void testGetJobMetricsThatFailsForException() throws Exception {
- DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null));
+ DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
Pipeline p = TestPipeline.create(options);
p.apply(Create.of(1, 2, 3));
[2/2] beam git commit: This closes #2796: Do not repeat log messages
in DataflowPipelineJob
Posted by ke...@apache.org.
This closes #2796: Do not repeat log messages in DataflowPipelineJob
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3fff52d4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3fff52d4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3fff52d4
Branch: refs/heads/master
Commit: 3fff52d43cbdc3b34f7d3e2ed7ac62f5ff06ccb6
Parents: 87236ce 401eef0
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 1 11:53:48 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon May 1 11:53:48 2017 -0700
----------------------------------------------------------------------
.../runners/dataflow/DataflowPipelineJob.java | 37 +++++-
.../beam/runners/dataflow/DataflowRunner.java | 6 +-
.../dataflow/util/DataflowTemplateJob.java | 2 +-
.../runners/dataflow/util/MonitoringUtil.java | 4 +-
.../dataflow/DataflowPipelineJobTest.java | 127 +++++++++++++++++--
.../testing/TestDataflowRunnerTest.java | 14 +-
6 files changed, 162 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3fff52d4/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------