You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/20 22:16:30 UTC
[37/50] [abbrv] incubator-beam git commit: Rename
DataflowPipelineRunner to DataflowRunner
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java
deleted file mode 100644
index fbaf116..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java
+++ /dev/null
@@ -1,601 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.testing;
-
-import static org.hamcrest.Matchers.containsString;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.doCallRealMethod;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
-import org.apache.beam.runners.dataflow.util.TimeUtil;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.SerializableMatcher;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.NoopPathValidator;
-import org.apache.beam.sdk.util.TestCredential;
-import org.apache.beam.sdk.util.Transport;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.api.client.http.LowLevelHttpResponse;
-import com.google.api.client.json.Json;
-import com.google.api.client.testing.http.MockHttpTransport;
-import com.google.api.client.testing.http.MockLowLevelHttpRequest;
-import com.google.api.client.testing.http.MockLowLevelHttpResponse;
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricStructuredName;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-/** Tests for {@link TestDataflowPipelineRunner}. */
-@RunWith(JUnit4.class)
-public class TestDataflowPipelineRunnerTest {
- @Rule public ExpectedException expectedException = ExpectedException.none();
- @Mock private MockHttpTransport transport;
- @Mock private MockLowLevelHttpRequest request;
- @Mock private GcsUtil mockGcsUtil;
-
- private TestDataflowPipelineOptions options;
- private Dataflow service;
-
- @Before
- public void setUp() throws Exception {
- MockitoAnnotations.initMocks(this);
- when(transport.buildRequest(anyString(), anyString())).thenReturn(request);
- doCallRealMethod().when(request).getContentAsString();
- service = new Dataflow(transport, Transport.getJsonFactory(), null);
-
- options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
- options.setAppName("TestAppName");
- options.setProject("test-project");
- options.setTempLocation("gs://test/temp/location");
- options.setTempRoot("gs://test");
- options.setGcpCredential(new TestCredential());
- options.setDataflowClient(service);
- options.setRunner(TestDataflowPipelineRunner.class);
- options.setPathValidatorClass(NoopPathValidator.class);
- }
-
- @Test
- public void testToString() {
- assertEquals("TestDataflowPipelineRunner#TestAppName",
- new TestDataflowPipelineRunner(options).toString());
- }
-
- @Test
- public void testRunBatchJobThatSucceeds() throws Exception {
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getDataflowClient()).thenReturn(service);
- when(mockJob.getState()).thenReturn(State.DONE);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- when(request.execute()).thenReturn(
- generateMockMetricResponse(true /* success */, true /* tentative */));
- assertEquals(mockJob, runner.run(p, mockRunner));
- }
-
- @Test
- public void testRunBatchJobThatFails() throws Exception {
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getDataflowClient()).thenReturn(service);
- when(mockJob.getState()).thenReturn(State.FAILED);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- try {
- runner.run(p, mockRunner);
- } catch (AssertionError expected) {
- return;
- }
- // Note that fail throws an AssertionError which is why it is placed out here
- // instead of inside the try-catch block.
- fail("AssertionError expected");
- }
-
- @Test
- public void testBatchPipelineFailsIfException() throws Exception {
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getDataflowClient()).thenReturn(service);
- when(mockJob.getState()).thenReturn(State.RUNNING);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
- when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
- .thenAnswer(new Answer<State>() {
- @Override
- public State answer(InvocationOnMock invocation) {
- JobMessage message = new JobMessage();
- message.setMessageText("FooException");
- message.setTime(TimeUtil.toCloudTime(Instant.now()));
- message.setMessageImportance("JOB_MESSAGE_ERROR");
- ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
- .process(Arrays.asList(message));
- return State.CANCELLED;
- }
- });
-
- DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- when(request.execute()).thenReturn(
- generateMockMetricResponse(false /* success */, true /* tentative */));
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- try {
- runner.run(p, mockRunner);
- } catch (AssertionError expected) {
- assertThat(expected.getMessage(), containsString("FooException"));
- verify(mockJob, atLeastOnce()).cancel();
- return;
- }
- // Note that fail throws an AssertionError which is why it is placed out here
- // instead of inside the try-catch block.
- fail("AssertionError expected");
- }
-
- @Test
- public void testRunStreamingJobThatSucceeds() throws Exception {
- options.setStreaming(true);
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getDataflowClient()).thenReturn(service);
- when(mockJob.getState()).thenReturn(State.RUNNING);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- when(request.execute()).thenReturn(
- generateMockMetricResponse(true /* success */, true /* tentative */));
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- runner.run(p, mockRunner);
- }
-
- @Test
- public void testRunStreamingJobThatFails() throws Exception {
- options.setStreaming(true);
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getDataflowClient()).thenReturn(service);
- when(mockJob.getState()).thenReturn(State.RUNNING);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- when(request.execute()).thenReturn(
- generateMockMetricResponse(false /* success */, true /* tentative */));
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- try {
- runner.run(p, mockRunner);
- } catch (AssertionError expected) {
- return;
- }
- // Note that fail throws an AssertionError which is why it is placed out here
- // instead of inside the try-catch block.
- fail("AssertionError expected");
- }
-
- @Test
- public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", service, null));
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- when(request.execute()).thenReturn(
- generateMockMetricResponse(true /* success */, true /* tentative */));
- doReturn(State.DONE).when(job).getState();
- assertEquals(Optional.of(true), runner.checkForSuccess(job));
- }
-
- @Test
- public void testCheckingForSuccessWhenPAssertFails() throws Exception {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", service, null));
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- when(request.execute()).thenReturn(
- generateMockMetricResponse(false /* success */, true /* tentative */));
- doReturn(State.DONE).when(job).getState();
- assertEquals(Optional.of(false), runner.checkForSuccess(job));
- }
-
- @Test
- public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", service, null));
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- when(request.execute()).thenReturn(
- generateMockMetricResponse(true /* success */, false /* tentative */));
- doReturn(State.RUNNING).when(job).getState();
- assertEquals(Optional.absent(), runner.checkForSuccess(job));
- }
-
- private LowLevelHttpResponse generateMockMetricResponse(boolean success, boolean tentative)
- throws Exception {
- MetricStructuredName name = new MetricStructuredName();
- name.setName(success ? "PAssertSuccess" : "PAssertFailure");
- name.setContext(
- tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.<String, String>of());
-
- MetricUpdate metric = new MetricUpdate();
- metric.setName(name);
- metric.setScalar(BigDecimal.ONE);
-
- MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
- response.setContentType(Json.MEDIA_TYPE);
- JobMetrics jobMetrics = new JobMetrics();
- jobMetrics.setMetrics(Lists.newArrayList(metric));
- // N.B. Setting the factory is necessary in order to get valid JSON.
- jobMetrics.setFactory(Transport.getJsonFactory());
- response.setContent(jobMetrics.toPrettyString());
- return response;
- }
-
- @Test
- public void testStreamingPipelineFailsIfServiceFails() throws Exception {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob("test-project", "test-job", service, null));
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- when(request.execute()).thenReturn(
- generateMockMetricResponse(true /* success */, false /* tentative */));
- doReturn(State.FAILED).when(job).getState();
- assertEquals(Optional.of(false), runner.checkForSuccess(job));
- }
-
- @Test
- public void testStreamingPipelineFailsIfException() throws Exception {
- options.setStreaming(true);
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getDataflowClient()).thenReturn(service);
- when(mockJob.getState()).thenReturn(State.RUNNING);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
- when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
- .thenAnswer(new Answer<State>() {
- @Override
- public State answer(InvocationOnMock invocation) {
- JobMessage message = new JobMessage();
- message.setMessageText("FooException");
- message.setTime(TimeUtil.toCloudTime(Instant.now()));
- message.setMessageImportance("JOB_MESSAGE_ERROR");
- ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
- .process(Arrays.asList(message));
- return State.CANCELLED;
- }
- });
-
- DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- when(request.execute()).thenReturn(
- generateMockMetricResponse(false /* success */, true /* tentative */));
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- try {
- runner.run(p, mockRunner);
- } catch (AssertionError expected) {
- assertThat(expected.getMessage(), containsString("FooException"));
- verify(mockJob, atLeastOnce()).cancel();
- return;
- }
- // Note that fail throws an AssertionError which is why it is placed out here
- // instead of inside the try-catch block.
- fail("AssertionError expected");
- }
-
- @Test
- public void testBatchOnCreateMatcher() throws Exception {
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getDataflowClient()).thenReturn(service);
- when(mockJob.getState()).thenReturn(State.DONE);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- p.getOptions().as(TestPipelineOptions.class)
- .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
-
- when(request.execute()).thenReturn(
- generateMockMetricResponse(true /* success */, true /* tentative */));
- runner.run(p, mockRunner);
- }
-
- @Test
- public void testStreamingOnCreateMatcher() throws Exception {
- options.setStreaming(true);
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getDataflowClient()).thenReturn(service);
- when(mockJob.getState()).thenReturn(State.DONE);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- p.getOptions().as(TestPipelineOptions.class)
- .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
-
- when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
- .thenReturn(State.DONE);
-
- when(request.execute()).thenReturn(
- generateMockMetricResponse(true /* success */, true /* tentative */));
- runner.run(p, mockRunner);
- }
-
- @Test
- public void testBatchOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getDataflowClient()).thenReturn(service);
- when(mockJob.getState()).thenReturn(State.DONE);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- p.getOptions().as(TestPipelineOptions.class)
- .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
-
- when(request.execute()).thenReturn(
- generateMockMetricResponse(true /* success */, true /* tentative */));
- runner.run(p, mockRunner);
- }
-
- @Test
- public void testStreamingOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
- options.setStreaming(true);
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getDataflowClient()).thenReturn(service);
- when(mockJob.getState()).thenReturn(State.DONE);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- p.getOptions().as(TestPipelineOptions.class)
- .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
-
- when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
- .thenReturn(State.DONE);
-
- when(request.execute()).thenReturn(
- generateMockMetricResponse(true /* success */, true /* tentative */));
- runner.run(p, mockRunner);
- }
-
- @Test
- public void testBatchOnSuccessMatcherWhenPipelineFails() throws Exception {
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getDataflowClient()).thenReturn(service);
- when(mockJob.getState()).thenReturn(State.FAILED);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- p.getOptions().as(TestPipelineOptions.class)
- .setOnSuccessMatcher(new TestFailureMatcher());
-
- when(request.execute()).thenReturn(
- generateMockMetricResponse(false /* success */, true /* tentative */));
- try {
- runner.run(p, mockRunner);
- } catch (AssertionError expected) {
- verify(mockJob, Mockito.times(1)).waitToFinish(any(Long.class), any(TimeUnit.class),
- any(JobMessagesHandler.class));
- return;
- }
- fail("Expected an exception on pipeline failure.");
- }
-
- @Test
- public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
- options.setStreaming(true);
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getDataflowClient()).thenReturn(service);
- when(mockJob.getState()).thenReturn(State.FAILED);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
- p.getOptions().as(TestPipelineOptions.class)
- .setOnSuccessMatcher(new TestFailureMatcher());
-
- when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
- .thenReturn(State.FAILED);
-
- when(request.execute()).thenReturn(
- generateMockMetricResponse(false /* success */, true /* tentative */));
- try {
- runner.run(p, mockRunner);
- } catch (AssertionError expected) {
- verify(mockJob, Mockito.times(1)).waitToFinish(any(Long.class), any(TimeUnit.class),
- any(JobMessagesHandler.class));
- return;
- }
- fail("Expected an exception on pipeline failure.");
- }
-
- static class TestSuccessMatcher extends BaseMatcher<PipelineResult> implements
- SerializableMatcher<PipelineResult> {
- private final DataflowPipelineJob mockJob;
- private final int called;
-
- public TestSuccessMatcher(DataflowPipelineJob job, int times) {
- this.mockJob = job;
- this.called = times;
- }
-
- @Override
- public boolean matches(Object o) {
- if (!(o instanceof PipelineResult)) {
- fail(String.format("Expected PipelineResult but received %s", o));
- }
- try {
- verify(mockJob, Mockito.times(called)).waitToFinish(any(Long.class), any(TimeUnit.class),
- any(JobMessagesHandler.class));
- } catch (IOException | InterruptedException e) {
- throw new AssertionError(e);
- }
- assertSame(mockJob, o);
- return true;
- }
-
- @Override
- public void describeTo(Description description) {
- }
- }
-
- static class TestFailureMatcher extends BaseMatcher<PipelineResult> implements
- SerializableMatcher<PipelineResult> {
- @Override
- public boolean matches(Object o) {
- fail("OnSuccessMatcher should not be called on pipeline failure.");
- return false;
- }
-
- @Override
- public void describeTo(Description description) {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
new file mode 100644
index 0000000..4067f08
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.testing;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.dataflow.DataflowPipelineJob;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
+import org.apache.beam.runners.dataflow.util.TimeUtil;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SerializableMatcher;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.NoopPathValidator;
+import org.apache.beam.sdk.util.TestCredential;
+import org.apache.beam.sdk.util.Transport;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.api.client.http.LowLevelHttpResponse;
+import com.google.api.client.json.Json;
+import com.google.api.client.testing.http.MockHttpTransport;
+import com.google.api.client.testing.http.MockLowLevelHttpRequest;
+import com.google.api.client.testing.http.MockLowLevelHttpResponse;
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricStructuredName;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+/** Tests for {@link TestDataflowPipelineRunner}. */
+@RunWith(JUnit4.class)
+public class TestDataflowRunnerTest {
+ @Rule public ExpectedException expectedException = ExpectedException.none();
+ @Mock private MockHttpTransport transport;
+ @Mock private MockLowLevelHttpRequest request;
+ @Mock private GcsUtil mockGcsUtil;
+
+ private TestDataflowPipelineOptions options;
+ private Dataflow service;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ when(transport.buildRequest(anyString(), anyString())).thenReturn(request);
+ doCallRealMethod().when(request).getContentAsString();
+ service = new Dataflow(transport, Transport.getJsonFactory(), null);
+
+ options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
+ options.setAppName("TestAppName");
+ options.setProject("test-project");
+ options.setTempLocation("gs://test/temp/location");
+ options.setTempRoot("gs://test");
+ options.setGcpCredential(new TestCredential());
+ options.setDataflowClient(service);
+ options.setRunner(TestDataflowPipelineRunner.class);
+ options.setPathValidatorClass(NoopPathValidator.class);
+ }
+
+ @Test
+ public void testToString() {
+ assertEquals("TestDataflowPipelineRunner#TestAppName",
+ new TestDataflowPipelineRunner(options).toString());
+ }
+
+ @Test
+ public void testRunBatchJobThatSucceeds() throws Exception {
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.DONE);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(true /* success */, true /* tentative */));
+ assertEquals(mockJob, runner.run(p, mockRunner));
+ }
+
+ @Test
+ public void testRunBatchJobThatFails() throws Exception {
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.FAILED);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ try {
+ runner.run(p, mockRunner);
+ } catch (AssertionError expected) {
+ return;
+ }
+ // Note that fail throws an AssertionError which is why it is placed out here
+ // instead of inside the try-catch block.
+ fail("AssertionError expected");
+ }
+
+ @Test
+ public void testBatchPipelineFailsIfException() throws Exception {
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.RUNNING);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+ when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+ .thenAnswer(new Answer<State>() {
+ @Override
+ public State answer(InvocationOnMock invocation) {
+ JobMessage message = new JobMessage();
+ message.setMessageText("FooException");
+ message.setTime(TimeUtil.toCloudTime(Instant.now()));
+ message.setMessageImportance("JOB_MESSAGE_ERROR");
+ ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
+ .process(Arrays.asList(message));
+ return State.CANCELLED;
+ }
+ });
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(false /* success */, true /* tentative */));
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ try {
+ runner.run(p, mockRunner);
+ } catch (AssertionError expected) {
+ assertThat(expected.getMessage(), containsString("FooException"));
+ verify(mockJob, atLeastOnce()).cancel();
+ return;
+ }
+ // Note that fail throws an AssertionError which is why it is placed out here
+ // instead of inside the try-catch block.
+ fail("AssertionError expected");
+ }
+
+ @Test
+ public void testRunStreamingJobThatSucceeds() throws Exception {
+ options.setStreaming(true);
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.RUNNING);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(true /* success */, true /* tentative */));
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ runner.run(p, mockRunner);
+ }
+
+ @Test
+ public void testRunStreamingJobThatFails() throws Exception {
+ options.setStreaming(true);
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.RUNNING);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(false /* success */, true /* tentative */));
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ try {
+ runner.run(p, mockRunner);
+ } catch (AssertionError expected) {
+ return;
+ }
+ // Note that fail throws an AssertionError which is why it is placed out here
+ // instead of inside the try-catch block.
+ fail("AssertionError expected");
+ }
+
+ @Test
+ public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
+ DataflowPipelineJob job =
+ spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(true /* success */, true /* tentative */));
+ doReturn(State.DONE).when(job).getState();
+ assertEquals(Optional.of(true), runner.checkForSuccess(job));
+ }
+
+ @Test
+ public void testCheckingForSuccessWhenPAssertFails() throws Exception {
+ DataflowPipelineJob job =
+ spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(false /* success */, true /* tentative */));
+ doReturn(State.DONE).when(job).getState();
+ assertEquals(Optional.of(false), runner.checkForSuccess(job));
+ }
+
+ @Test
+ public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
+ DataflowPipelineJob job =
+ spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(true /* success */, false /* tentative */));
+ doReturn(State.RUNNING).when(job).getState();
+ assertEquals(Optional.absent(), runner.checkForSuccess(job));
+ }
+
+ private LowLevelHttpResponse generateMockMetricResponse(boolean success, boolean tentative)
+ throws Exception {
+ MetricStructuredName name = new MetricStructuredName();
+ name.setName(success ? "PAssertSuccess" : "PAssertFailure");
+ name.setContext(
+ tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.<String, String>of());
+
+ MetricUpdate metric = new MetricUpdate();
+ metric.setName(name);
+ metric.setScalar(BigDecimal.ONE);
+
+ MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
+ response.setContentType(Json.MEDIA_TYPE);
+ JobMetrics jobMetrics = new JobMetrics();
+ jobMetrics.setMetrics(Lists.newArrayList(metric));
+ // N.B. Setting the factory is necessary in order to get valid JSON.
+ jobMetrics.setFactory(Transport.getJsonFactory());
+ response.setContent(jobMetrics.toPrettyString());
+ return response;
+ }
+
+ @Test
+ public void testStreamingPipelineFailsIfServiceFails() throws Exception {
+ DataflowPipelineJob job =
+ spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(true /* success */, false /* tentative */));
+ doReturn(State.FAILED).when(job).getState();
+ assertEquals(Optional.of(false), runner.checkForSuccess(job));
+ }
+
+ @Test
+ public void testStreamingPipelineFailsIfException() throws Exception {
+ options.setStreaming(true);
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.RUNNING);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+ when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+ .thenAnswer(new Answer<State>() {
+ @Override
+ public State answer(InvocationOnMock invocation) {
+ JobMessage message = new JobMessage();
+ message.setMessageText("FooException");
+ message.setTime(TimeUtil.toCloudTime(Instant.now()));
+ message.setMessageImportance("JOB_MESSAGE_ERROR");
+ ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
+ .process(Arrays.asList(message));
+ return State.CANCELLED;
+ }
+ });
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(false /* success */, true /* tentative */));
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ try {
+ runner.run(p, mockRunner);
+ } catch (AssertionError expected) {
+ assertThat(expected.getMessage(), containsString("FooException"));
+ verify(mockJob, atLeastOnce()).cancel();
+ return;
+ }
+ // Note that fail throws an AssertionError which is why it is placed out here
+ // instead of inside the try-catch block.
+ fail("AssertionError expected");
+ }
+
+ @Test
+ public void testBatchOnCreateMatcher() throws Exception {
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.DONE);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ p.getOptions().as(TestPipelineOptions.class)
+ .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
+
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(true /* success */, true /* tentative */));
+ runner.run(p, mockRunner);
+ }
+
+ @Test
+ public void testStreamingOnCreateMatcher() throws Exception {
+ options.setStreaming(true);
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.DONE);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ p.getOptions().as(TestPipelineOptions.class)
+ .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
+
+ when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+ .thenReturn(State.DONE);
+
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(true /* success */, true /* tentative */));
+ runner.run(p, mockRunner);
+ }
+
+ @Test
+ public void testBatchOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.DONE);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ p.getOptions().as(TestPipelineOptions.class)
+ .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
+
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(true /* success */, true /* tentative */));
+ runner.run(p, mockRunner);
+ }
+
+ @Test
+ public void testStreamingOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
+ options.setStreaming(true);
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.DONE);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ p.getOptions().as(TestPipelineOptions.class)
+ .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
+
+ when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+ .thenReturn(State.DONE);
+
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(true /* success */, true /* tentative */));
+ runner.run(p, mockRunner);
+ }
+
+ @Test
+ public void testBatchOnSuccessMatcherWhenPipelineFails() throws Exception {
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.FAILED);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ p.getOptions().as(TestPipelineOptions.class)
+ .setOnSuccessMatcher(new TestFailureMatcher());
+
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(false /* success */, true /* tentative */));
+ try {
+ runner.run(p, mockRunner);
+ } catch (AssertionError expected) {
+ verify(mockJob, Mockito.times(1)).waitToFinish(any(Long.class), any(TimeUnit.class),
+ any(JobMessagesHandler.class));
+ return;
+ }
+ fail("Expected an exception on pipeline failure.");
+ }
+
+ @Test
+ public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
+ options.setStreaming(true);
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getDataflowClient()).thenReturn(service);
+ when(mockJob.getState()).thenReturn(State.FAILED);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+ p.getOptions().as(TestPipelineOptions.class)
+ .setOnSuccessMatcher(new TestFailureMatcher());
+
+ when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+ .thenReturn(State.FAILED);
+
+ when(request.execute()).thenReturn(
+ generateMockMetricResponse(false /* success */, true /* tentative */));
+ try {
+ runner.run(p, mockRunner);
+ } catch (AssertionError expected) {
+ verify(mockJob, Mockito.times(1)).waitToFinish(any(Long.class), any(TimeUnit.class),
+ any(JobMessagesHandler.class));
+ return;
+ }
+ fail("Expected an exception on pipeline failure.");
+ }
+
+ static class TestSuccessMatcher extends BaseMatcher<PipelineResult> implements
+ SerializableMatcher<PipelineResult> {
+ private final DataflowPipelineJob mockJob;
+ private final int called;
+
+ public TestSuccessMatcher(DataflowPipelineJob job, int times) {
+ this.mockJob = job;
+ this.called = times;
+ }
+
+ @Override
+ public boolean matches(Object o) {
+ if (!(o instanceof PipelineResult)) {
+ fail(String.format("Expected PipelineResult but received %s", o));
+ }
+ try {
+ verify(mockJob, Mockito.times(called)).waitToFinish(any(Long.class), any(TimeUnit.class),
+ any(JobMessagesHandler.class));
+ } catch (IOException | InterruptedException e) {
+ throw new AssertionError(e);
+ }
+ assertSame(mockJob, o);
+ return true;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ }
+ }
+
+ static class TestFailureMatcher extends BaseMatcher<PipelineResult> implements
+ SerializableMatcher<PipelineResult> {
+ @Override
+ public boolean matches(Object o) {
+ fail("OnSuccessMatcher should not be called on pipeline failure.");
+ return false;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java
index 0b865c3..d809cc6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.dataflow.transforms;
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.options.GcpOptions;
@@ -30,7 +30,7 @@ import com.google.common.collect.Lists;
/**
* Factory methods for creating {@link DisplayDataEvaluator} instances against the
- * {@link DataflowPipelineRunner}.
+ * {@link DataflowRunner}.
*/
public final class DataflowDisplayDataEvaluator {
/** Do not instantiate. */
@@ -43,7 +43,7 @@ public final class DataflowDisplayDataEvaluator {
public static DataflowPipelineOptions getDefaultOptions() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setRunner(DataflowPipelineRunner.class);
+ options.setRunner(DataflowRunner.class);
options.setProject("foobar");
options.setTempLocation("gs://bucket/tmpLocation");
options.setFilesToStage(Lists.<String>newArrayList());
@@ -56,7 +56,7 @@ public final class DataflowDisplayDataEvaluator {
/**
* Create a {@link DisplayDataEvaluator} instance to evaluate pipeline display data against
- * the {@link DataflowPipelineRunner}.
+ * the {@link DataflowRunner}.
*/
public static DisplayDataEvaluator create() {
return create(getDefaultOptions());
@@ -64,7 +64,7 @@ public final class DataflowDisplayDataEvaluator {
/**
* Create a {@link DisplayDataEvaluator} instance to evaluate pipeline display data against
- * the {@link DataflowPipelineRunner} with the specified {@code options}.
+ * the {@link DataflowRunner} with the specified {@code options}.
*/
public static DisplayDataEvaluator create(DataflowPipelineOptions options) {
return DisplayDataEvaluator.create(options);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
index f0e677e..a44b8a7 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.dataflow.transforms;
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
@@ -46,20 +46,20 @@ import org.junit.runners.JUnit4;
import java.util.Arrays;
import java.util.List;
-/** Tests for {@link GroupByKey} for the {@link DataflowPipelineRunner}. */
+/** Tests for {@link GroupByKey} for the {@link DataflowRunner}. */
@RunWith(JUnit4.class)
public class DataflowGroupByKeyTest {
@Rule
public ExpectedException thrown = ExpectedException.none();
/**
- * Create a test pipeline that uses the {@link DataflowPipelineRunner} so that {@link GroupByKey}
+ * Create a test pipeline that uses the {@link DataflowRunner} so that {@link GroupByKey}
* is not expanded. This is used for verifying that even without expansion the proper errors show
* up.
*/
private Pipeline createTestServiceRunner() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setRunner(DataflowPipelineRunner.class);
+ options.setRunner(DataflowRunner.class);
options.setProject("someproject");
options.setStagingLocation("gs://staging");
options.setPathValidatorClass(NoopPathValidator.class);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
index d787500..1b263d2 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.dataflow.transforms;
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -44,7 +44,7 @@ import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-/** Tests for {@link View} for a {@link DataflowPipelineRunner}. */
+/** Tests for {@link View} for a {@link DataflowRunner}. */
@RunWith(JUnit4.class)
public class DataflowViewTest {
@Rule
@@ -52,7 +52,7 @@ public class DataflowViewTest {
private Pipeline createTestBatchRunner() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setRunner(DataflowPipelineRunner.class);
+ options.setRunner(DataflowRunner.class);
options.setProject("someproject");
options.setStagingLocation("gs://staging");
options.setPathValidatorClass(NoopPathValidator.class);
@@ -62,7 +62,7 @@ public class DataflowViewTest {
private Pipeline createTestStreamingRunner() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setRunner(DataflowPipelineRunner.class);
+ options.setRunner(DataflowRunner.class);
options.setStreaming(true);
options.setProject("someproject");
options.setStagingLocation("gs://staging");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
index 5587986..a91f56c 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
@@ -21,7 +21,7 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.when;
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.GcsUtil;
@@ -52,7 +52,7 @@ public class DataflowPathValidatorTest {
when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setGcpCredential(new TestCredential());
- options.setRunner(DataflowPipelineRunner.class);
+ options.setRunner(DataflowRunner.class);
options.setGcsUtil(mockGcsUtil);
validator = new DataflowPathValidator(options);
}
@@ -66,7 +66,7 @@ public class DataflowPathValidatorTest {
public void testInvalidFilePattern() {
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage(
- "DataflowPipelineRunner expected a valid 'gs://' path but was given '/local/path'");
+ "DataflowRunner expected a valid 'gs://' path but was given '/local/path'");
validator.validateInputFilePatternSupported("/local/path");
}
@@ -88,7 +88,7 @@ public class DataflowPathValidatorTest {
public void testInvalidOutputPrefix() {
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage(
- "DataflowPipelineRunner expected a valid 'gs://' path but was given '/local/path'");
+ "DataflowRunner expected a valid 'gs://' path but was given '/local/path'");
validator.validateOutputFilePrefixSupported("/local/path");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index 61ad24f..2b4464d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -113,7 +113,7 @@ public class SimpleWordCountTest {
String[] words = WORD_BOUNDARY.split(c.element());
// Keep track of the number of lines without any words encountered while tokenizing.
- // This aggregator is visible in the monitoring UI when run using DataflowPipelineRunner.
+ // This aggregator is visible in the monitoring UI when run using DataflowRunner.
if (words.length == 0) {
emptyLines.addValue(1L);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
index 22a2241..de3c152 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
@@ -158,7 +158,7 @@ public class SerializationTest {
String[] words = WORD_BOUNDARY.split(c.element().toString());
// Keep track of the number of lines without any words encountered while tokenizing.
- // This aggregator is visible in the monitoring UI when run using DataflowPipelineRunner.
+ // This aggregator is visible in the monitoring UI when run using DataflowRunner.
if (words.length == 0) {
emptyLines.addValue(1L);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/package-info.java
index 479090f..8719384 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/package-info.java
@@ -17,16 +17,16 @@
*/
/**
* Defines runners for executing Pipelines in different modes, including
- * {@link org.apache.beam.sdk.runners.DirectPipelineRunner} and
- * {@link org.apache.beam.sdk.runners.DataflowPipelineRunner}.
+ * {@link org.apache.beam.sdk.runners.DirectRunner} and
+ * {@link org.apache.beam.sdk.runners.DataflowRunner}.
*
- * <p>{@link org.apache.beam.sdk.runners.DirectPipelineRunner} executes a {@code Pipeline}
+ * <p>{@link org.apache.beam.sdk.runners.DirectRunner} executes a {@code Pipeline}
* locally, without contacting the Dataflow service.
- * {@link org.apache.beam.sdk.runners.DataflowPipelineRunner} submits a
+ * {@link org.apache.beam.sdk.runners.DataflowRunner} submits a
* {@code Pipeline} to the Dataflow service, which executes it on Dataflow-managed Compute Engine
- * instances. {@code DataflowPipelineRunner} returns
+ * instances. {@code DataflowRunner} returns
* as soon as the {@code Pipeline} has been submitted. Use
- * {@link org.apache.beam.sdk.runners.BlockingDataflowPipelineRunner} to have execution
+ * {@link org.apache.beam.sdk.runners.BlockingDataflowRunner} to have execution
* updates printed to the console.
*
* <p>The runner is specified as part {@link org.apache.beam.sdk.options.PipelineOptions}.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 0dba043..b901268 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -60,7 +60,7 @@ import javax.annotation.Nullable;
* <li>System property "beamTestPipelineOptions" must contain a JSON delimited list of pipeline
* options. For example:
* <pre>{@code [
- * "--runner=org.apache.beam.runners.dataflow.testing.TestDataflowPipelineRunner",
+ * "--runner=org.apache.beam.runners.dataflow.testing.TestDataflowRunner",
* "--project=mygcpproject",
* "--stagingLocation=gs://mygcsbucket/path"
* ]}</pre>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
index 4c98123..329dec5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
* {@code Aggregator} by calling {@link Aggregator#addValue}.
*
* <p>Aggregators are visible in the monitoring UI, when the pipeline is run
- * using DataflowPipelineRunner or BlockingDataflowPipelineRunner, along with
+ * using DataflowRunner or BlockingDataflowRunner, along with
* their current value. Aggregators may not become visible until the system
* begins executing the ParDo transform that created them and/or their initial
* value is changed.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java
index ad41a3f..3865654 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java
@@ -156,7 +156,7 @@ public class BigQueryTableRowIterator implements AutoCloseable {
if (iteratorOverCurrentBatch != null && iteratorOverCurrentBatch.hasNext()) {
// Embed schema information into the raw row, so that values have an
// associated key. This matches how rows are read when using the
- // DataflowPipelineRunner.
+ // DataflowRunner.
current = getTypedTableRow(schema.getFields(), iteratorOverCurrentBatch.next());
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
index fb8bb72..f9ce018 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
@@ -37,7 +37,7 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/**
- * Tests for DataflowPipelineRunner.
+ * Tests for DataflowRunner.
*/
@RunWith(JUnit4.class)
public class PipelineRunnerTest {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
index 43c990a..3306cb4 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
@@ -70,7 +70,7 @@ import java.util.regex.Pattern;
* <pre>{@code
* --project=YOUR_PROJECT_ID
* --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- * --runner=BlockingDataflowPipelineRunner
+ * --runner=BlockingDataflowRunner
* --workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}
* }
* </pre>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
index 3e4fc86..98af2e7 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
@@ -17,7 +17,7 @@
*/
package ${package};
-import org.apache.beam.runners.dataflow.BlockingDataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.BlockingDataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
@@ -63,7 +63,7 @@ public class MinimalWordCount {
// in Google Cloud Storage to stage files.
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
- options.setRunner(BlockingDataflowPipelineRunner.class);
+ options.setRunner(BlockingDataflowRunner.class);
// CHANGE 1/3: Your project ID is required in order to run your pipeline on the Google Cloud.
options.setProject("SET_YOUR_PROJECT_ID_HERE");
// CHANGE 2/3: Your Google Cloud Storage path is required for staging local files.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
index 7dea9fe..8e56b03 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
@@ -82,7 +82,7 @@ import java.util.List;
* <pre>{@code
* --project=YOUR_PROJECT_ID
* --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- * --runner=BlockingDataflowPipelineRunner
+ * --runner=BlockingDataflowRunner
* }
* </pre>
*
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
index fc1f4b5..07ed6d0 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
@@ -77,7 +77,7 @@ import org.apache.beam.sdk.values.PCollection;
* <pre>{@code
* --project=YOUR_PROJECT_ID
* --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- * --runner=BlockingDataflowPipelineRunner
+ * --runner=BlockingDataflowRunner
* }
* </pre>
* and an output prefix on GCS:
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
index 6ec4540..82f0eff 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
@@ -17,9 +17,9 @@
*/
package ${package}.common;
-import org.apache.beam.runners.dataflow.BlockingDataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.BlockingDataflowRunner;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
@@ -251,10 +251,10 @@ public class DataflowExampleUtils {
}
public void setupRunner() {
- if (options.isStreaming() && options.getRunner().equals(BlockingDataflowPipelineRunner.class)) {
+ if (options.isStreaming() && options.getRunner().equals(BlockingDataflowRunner.class)) {
// In order to cancel the pipelines automatically,
- // {@literal DataflowPipelineRunner} is forced to be used.
- options.setRunner(DataflowPipelineRunner.class);
+ // {@literal DataflowRunner} is forced to be used.
+ options.setRunner(DataflowRunner.class);
}
}
@@ -268,7 +268,7 @@ public class DataflowExampleUtils {
DataflowPipelineOptions copiedOptions = options.cloneAs(DataflowPipelineOptions.class);
copiedOptions.setStreaming(false);
copiedOptions.setWorkerHarnessContainerImage(
- DataflowPipelineRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE);
+ DataflowRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE);
copiedOptions.setNumWorkers(
options.as(ExamplePubsubTopicOptions.class).getInjectorNumWorkers());
copiedOptions.setJobName(options.getJobName() + "-injector");
@@ -298,7 +298,7 @@ public class DataflowExampleUtils {
}
/**
- * If {@literal DataflowPipelineRunner} or {@literal BlockingDataflowPipelineRunner} is used,
+ * If {@literal DataflowRunner} or {@literal BlockingDataflowRunner} is used,
* waits for the pipeline to finish and cancels it (and the injector) before the program exists.
*/
public void waitToFinish(PipelineResult result) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
index 027431f..9a75bb7 100644
--- a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
+++ b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
* Platform, you should specify the following command-line options:
* --project=<YOUR_PROJECT_ID>
* --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
- * --runner=BlockingDataflowPipelineRunner
+ * --runner=BlockingDataflowRunner
*/
public class StarterPipeline {
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
index bb86b0d..8c71d9d 100644
--- a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
+++ b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
* Platform, you should specify the following command-line options:
* --project=<YOUR_PROJECT_ID>
* --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
- * --runner=BlockingDataflowPipelineRunner
+ * --runner=BlockingDataflowRunner
*/
public class StarterPipeline {
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index 09fe7d9..55aea6a 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -37,7 +37,7 @@
<module>core</module>
<module>io</module>
<!-- sdks/java/maven-archtypes has several dependencies on the
- DataflowPipelineRunner. Until these are refactored out or
+ DataflowRunner. Until these are refactored out or
a released artifact exists, we need to modify the build order.
<module>maven-archetypes</module> -->
<module>extensions</module>