You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/07 23:12:17 UTC
[1/4] beam git commit: Register the TestDataflowRunner via registrar
Repository: beam
Updated Branches:
refs/heads/master dabad1ae5 -> 53dd0a529
Register the TestDataflowRunner via registrar
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eb043d06
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eb043d06
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eb043d06
Branch: refs/heads/master
Commit: eb043d06df0da4a47d16047f1f050e41114ac424
Parents: 019d300
Author: Kenneth Knowles <kl...@google.com>
Authored: Sat May 6 05:13:34 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat May 6 13:49:51 2017 -0700
----------------------------------------------------------------------
.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall.groovy | 2 +-
.../job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy | 2 +-
.test-infra/jenkins/job_beam_Release_NightlySnapshot.groovy | 2 +-
examples/java/pom.xml | 4 ++--
.../apache/beam/runners/dataflow/DataflowPipelineRegistrar.java | 3 ++-
.../beam/runners/dataflow/DataflowPipelineRegistrarTest.java | 2 +-
.../src/main/java/org/apache/beam/sdk/testing/TestPipeline.java | 2 +-
7 files changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/eb043d06/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall.groovy
----------------------------------------------------------------------
diff --git a/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall.groovy b/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall.groovy
index 51aedc3..2f05c38 100644
--- a/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall.groovy
+++ b/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall.groovy
@@ -44,5 +44,5 @@ mavenJob('beam_PostCommit_Java_MavenInstall') {
'Run Java PostCommit')
// Maven goals for this job.
- goals('-B -e -P release,dataflow-runner clean install coveralls:report -DrepoToken=$COVERALLS_REPO_TOKEN -DskipITs=false -DintegrationTestPipelineOptions=\'[ "--project=apache-beam-testing", "--tempRoot=gs://temp-storage-for-end-to-end-tests", "--runner=org.apache.beam.runners.dataflow.testing.TestDataflowRunner" ]\'')
+ goals('-B -e -P release,dataflow-runner clean install coveralls:report -DrepoToken=$COVERALLS_REPO_TOKEN -DskipITs=false -DintegrationTestPipelineOptions=\'[ "--project=apache-beam-testing", "--tempRoot=gs://temp-storage-for-end-to-end-tests", "--runner=TestDataflowRunner" ]\'')
}
http://git-wip-us.apache.org/repos/asf/beam/blob/eb043d06/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy
----------------------------------------------------------------------
diff --git a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy
index 33235ff..2c739e3 100644
--- a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy
+++ b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy
@@ -41,5 +41,5 @@ mavenJob('beam_PostCommit_Java_ValidatesRunner_Dataflow') {
'Run Dataflow ValidatesRunner')
// Maven goals for this job.
- goals('-B -e clean verify -am -pl runners/google-cloud-dataflow-java -DforkCount=0 -DvalidatesRunnerPipelineOptions=\'[ "--runner=org.apache.beam.runners.dataflow.testing.TestDataflowRunner", "--project=apache-beam-testing", "--tempRoot=gs://temp-storage-for-validates-runner-tests/" ]\'')
+ goals('-B -e clean verify -am -pl runners/google-cloud-dataflow-java -DforkCount=0 -DvalidatesRunnerPipelineOptions=\'[ "--runner=TestDataflowRunner", "--project=apache-beam-testing", "--tempRoot=gs://temp-storage-for-validates-runner-tests/" ]\'')
}
http://git-wip-us.apache.org/repos/asf/beam/blob/eb043d06/.test-infra/jenkins/job_beam_Release_NightlySnapshot.groovy
----------------------------------------------------------------------
diff --git a/.test-infra/jenkins/job_beam_Release_NightlySnapshot.groovy b/.test-infra/jenkins/job_beam_Release_NightlySnapshot.groovy
index f2c3ff0..7284acd 100644
--- a/.test-infra/jenkins/job_beam_Release_NightlySnapshot.groovy
+++ b/.test-infra/jenkins/job_beam_Release_NightlySnapshot.groovy
@@ -41,5 +41,5 @@ mavenJob('beam_Release_NightlySnapshot') {
'dev@beam.apache.org')
// Maven goals for this job.
- goals('-B -e clean deploy -P release,dataflow-runner -DskipITs=false -DintegrationTestPipelineOptions=\'[ "--project=apache-beam-testing", "--tempRoot=gs://temp-storage-for-end-to-end-tests", "--runner=org.apache.beam.runners.dataflow.testing.TestDataflowRunner" ]\'')
+ goals('-B -e clean deploy -P release,dataflow-runner -DskipITs=false -DintegrationTestPipelineOptions=\'[ "--project=apache-beam-testing", "--tempRoot=gs://temp-storage-for-end-to-end-tests", "--runner=TestDataflowRunner" ]\'')
}
http://git-wip-us.apache.org/repos/asf/beam/blob/eb043d06/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index a6ef5d1..21096f7 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -263,7 +263,7 @@
[
"--project=apache-beam-testing",
"--tempRoot=gs://temp-storage-for-end-to-end-tests",
- "--runner=org.apache.beam.runners.dataflow.testing.TestDataflowRunner"
+ "--runner=TestDataflowRunner"
]
</beamTestPipelineOptions>
</systemPropertyVariables>
@@ -287,7 +287,7 @@
[
"--project=apache-beam-testing",
"--tempRoot=gs://temp-storage-for-end-to-end-tests",
- "--runner=org.apache.beam.runners.dataflow.testing.TestDataflowRunner",
+ "--runner=TestDataflowRunner",
"--streaming=true"
]
</beamTestPipelineOptions>
http://git-wip-us.apache.org/repos/asf/beam/blob/eb043d06/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
index b6802bb..f36930f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.dataflow;
import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableList;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.testing.TestDataflowRunner;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
@@ -52,7 +53,7 @@ public class DataflowPipelineRegistrar {
@Override
public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
- DataflowRunner.class);
+ DataflowRunner.class, TestDataflowRunner.class);
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/eb043d06/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java
index 31f9281..728fc71 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java
@@ -41,7 +41,7 @@ public class DataflowPipelineRegistrarTest {
@Test
public void testCorrectRunnersAreReturned() {
- assertEquals(ImmutableList.of(DataflowRunner.class),
+ assertEquals(ImmutableList.of(DataflowRunner.class, TestDataflowRunner.class),
new DataflowPipelineRegistrar.Runner().getPipelineRunners());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/eb043d06/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 96cae51..e04c2f8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -72,7 +72,7 @@ import org.junit.runners.model.Statement;
* <li>System property "beamTestPipelineOptions" must contain a JSON delimited list of pipeline
* options. For example:
* <pre>{@code [
- * "--runner=org.apache.beam.runners.dataflow.testing.TestDataflowRunner",
+ * "--runner=TestDataflowRunner",
* "--project=mygcpproject",
* "--stagingLocation=gs://mygcsbucket/path"
* ]}</pre>
[2/4] beam git commit: Move TestDataflowRunner into dataflow package
Posted by ke...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
deleted file mode 100644
index eb068e6..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ /dev/null
@@ -1,655 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.testing;
-
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricStructuredName;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.dataflow.DataflowClient;
-import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
-import org.apache.beam.runners.dataflow.util.TimeUtil;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.SerializableMatcher;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.util.NoopPathValidator;
-import org.apache.beam.sdk.util.Transport;
-import org.apache.beam.sdk.values.PCollection;
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/** Tests for {@link TestDataflowRunner}. */
-@RunWith(JUnit4.class)
-public class TestDataflowRunnerTest {
- @Rule public ExpectedException expectedException = ExpectedException.none();
- @Mock private DataflowClient mockClient;
-
- private TestDataflowPipelineOptions options;
-
- @Before
- public void setUp() throws Exception {
- MockitoAnnotations.initMocks(this);
-
- options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
- options.setAppName("TestAppName");
- options.setProject("test-project");
- options.setTempLocation("gs://test/temp/location");
- options.setTempRoot("gs://test");
- options.setGcpCredential(new TestCredential());
- options.setRunner(TestDataflowRunner.class);
- options.setPathValidatorClass(NoopPathValidator.class);
- }
-
- @Test
- public void testToString() {
- assertEquals("TestDataflowRunner#TestAppName",
- TestDataflowRunner.fromOptions(options).toString());
- }
-
- @Test
- public void testRunBatchJobThatSucceeds() throws Exception {
- Pipeline p = Pipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getState()).thenReturn(State.DONE);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
- assertEquals(mockJob, runner.run(p, mockRunner));
- }
-
- @Test
- public void testRunBatchJobThatFails() throws Exception {
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getState()).thenReturn(State.FAILED);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(generateMockMetricResponse(false /* success */, false /* tentative */));
- try {
- runner.run(p, mockRunner);
- } catch (AssertionError expected) {
- return;
- }
- // Note that fail throws an AssertionError which is why it is placed out here
- // instead of inside the try-catch block.
- fail("AssertionError expected");
- }
-
- @Test
- public void testBatchPipelineFailsIfException() throws Exception {
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getState()).thenReturn(State.RUNNING);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
- when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
- .thenAnswer(new Answer<State>() {
- @Override
- public State answer(InvocationOnMock invocation) {
- JobMessage message = new JobMessage();
- message.setMessageText("FooException");
- message.setTime(TimeUtil.toCloudTime(Instant.now()));
- message.setMessageImportance("JOB_MESSAGE_ERROR");
- ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[1])
- .process(Arrays.asList(message));
- return State.CANCELLED;
- }
- });
-
- DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */));
- TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- try {
- runner.run(p, mockRunner);
- } catch (AssertionError expected) {
- assertThat(expected.getMessage(), containsString("FooException"));
- verify(mockJob, never()).cancel();
- return;
- }
- // Note that fail throws an AssertionError which is why it is placed out here
- // instead of inside the try-catch block.
- fail("AssertionError expected");
- }
-
- /**
- * A streaming job that terminates with no error messages is a success.
- */
- @Test
- public void testRunStreamingJobUsingPAssertThatSucceeds() throws Exception {
- options.setStreaming(true);
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getState()).thenReturn(State.DONE);
- when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
- .thenReturn(State.DONE);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
- TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- runner.run(p, mockRunner);
- }
-
- @Test
- public void testRunStreamingJobNotUsingPAssertThatSucceeds() throws Exception {
- options.setStreaming(true);
- Pipeline p = TestPipeline.create(options);
- p.apply(Create.of(1, 2, 3));
-
- DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getState()).thenReturn(State.DONE);
- when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
- .thenReturn(State.DONE);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(generateMockStreamingMetricResponse(ImmutableMap.<String, BigDecimal>of()));
- TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- runner.run(p, mockRunner);
- }
-
- /**
- * Tests that a streaming job with a false {@link PAssert} fails.
- *
- * <p>Currently, this failure is indistinguishable from a non-{@link PAssert} failure, because it
- * is detected only by failure job messages. With fuller metric support, this can detect a PAssert
- * failure via metrics and raise an {@link AssertionError} in just that case.
- */
- @Test
- public void testRunStreamingJobThatFails() throws Exception {
- testStreamingPipelineFailsIfException();
- }
-
- private JobMetrics generateMockMetricResponse(boolean success, boolean tentative)
- throws Exception {
- List<MetricUpdate> metrics = generateMockMetrics(success, tentative);
- return buildJobMetrics(metrics);
- }
-
- private List<MetricUpdate> generateMockMetrics(boolean success, boolean tentative) {
- MetricStructuredName name = new MetricStructuredName();
- name.setName(success ? "PAssertSuccess" : "PAssertFailure");
- name.setContext(
- tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.<String, String>of());
-
- MetricUpdate metric = new MetricUpdate();
- metric.setName(name);
- metric.setScalar(BigDecimal.ONE);
- return Lists.newArrayList(metric);
- }
-
- private JobMetrics generateMockStreamingMetricResponse(Map<String,
- BigDecimal> metricMap) throws IOException {
- return buildJobMetrics(generateMockStreamingMetrics(metricMap));
- }
-
- private List<MetricUpdate> generateMockStreamingMetrics(Map<String, BigDecimal> metricMap) {
- List<MetricUpdate> metrics = Lists.newArrayList();
- for (Map.Entry<String, BigDecimal> entry : metricMap.entrySet()) {
- MetricStructuredName name = new MetricStructuredName();
- name.setName(entry.getKey());
-
- MetricUpdate metric = new MetricUpdate();
- metric.setName(name);
- metric.setScalar(entry.getValue());
- metrics.add(metric);
- }
- return metrics;
- }
-
- private JobMetrics buildJobMetrics(List<MetricUpdate> metricList) {
- JobMetrics jobMetrics = new JobMetrics();
- jobMetrics.setMetrics(metricList);
- // N.B. Setting the factory is necessary in order to get valid JSON.
- jobMetrics.setFactory(Transport.getJsonFactory());
- return jobMetrics;
- }
-
- /**
- * Tests that a tentative {@code true} from metrics indicates that every {@link PAssert} has
- * succeeded.
- */
- @Test
- public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(buildJobMetrics(generateMockMetrics(true /* success */, true /* tentative */)));
-
- TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- doReturn(State.DONE).when(job).getState();
- assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(true)));
- }
-
- /**
- * Tests that when we just see a tentative failure for a {@link PAssert} it is considered a
- * conclusive failure.
- */
- @Test
- public void testCheckingForSuccessWhenPAssertFails() throws Exception {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(
- buildJobMetrics(generateMockMetrics(false /* success */, true /* tentative */)));
-
- TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- doReturn(State.DONE).when(job).getState();
- assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(false)));
- }
-
- @Test
- public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
- DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(
- buildJobMetrics(generateMockMetrics(true /* success */, false /* tentative */)));
-
- TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- runner.updatePAssertCount(p);
- doReturn(State.RUNNING).when(job).getState();
- assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.<Boolean>absent()));
- }
-
- /**
- * Tests that if a streaming pipeline terminates with FAIL that the check for PAssert
- * success is a conclusive failure.
- */
- @Test
- public void testStreamingPipelineFailsIfServiceFails() throws Exception {
- DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- doReturn(State.FAILED).when(job).getState();
- assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(false)));
- }
-
- /**
- * Tests that if a streaming pipeline crash loops for a non-assertion reason that the test run
- * throws an {@link AssertionError}.
- *
- * <p>This is a known limitation/bug of the runner that it does not distinguish the two modes of
- * failure.
- */
- @Test
- public void testStreamingPipelineFailsIfException() throws Exception {
- options.setStreaming(true);
- Pipeline pipeline = TestPipeline.create(options);
- PCollection<Integer> pc = pipeline.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getState()).thenReturn(State.RUNNING);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
- when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
- .thenAnswer(new Answer<State>() {
- @Override
- public State answer(InvocationOnMock invocation) {
- JobMessage message = new JobMessage();
- message.setMessageText("FooException");
- message.setTime(TimeUtil.toCloudTime(Instant.now()));
- message.setMessageImportance("JOB_MESSAGE_ERROR");
- ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[1])
- .process(Arrays.asList(message));
- return State.CANCELLED;
- }
- });
-
- DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */));
- TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-
- try {
- runner.run(pipeline, mockRunner);
- } catch (AssertionError exc) {
- return;
- }
- fail("AssertionError expected");
- }
-
- @Test
- public void testGetJobMetricsThatSucceeds() throws Exception {
- DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
- Pipeline p = TestPipeline.create(options);
- p.apply(Create.of(1, 2, 3));
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
- TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- JobMetrics metrics = runner.getJobMetrics(job);
-
- assertEquals(1, metrics.getMetrics().size());
- assertEquals(generateMockMetrics(true /* success */, true /* tentative */),
- metrics.getMetrics());
- }
-
- @Test
- public void testGetJobMetricsThatFailsForException() throws Exception {
- DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
- Pipeline p = TestPipeline.create(options);
- p.apply(Create.of(1, 2, 3));
-
- when(mockClient.getJobMetrics(anyString())).thenThrow(new IOException());
- TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- assertNull(runner.getJobMetrics(job));
- }
-
- @Test
- public void testBatchOnCreateMatcher() throws Exception {
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getState()).thenReturn(State.DONE);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- options.as(TestPipelineOptions.class).setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
- runner.run(p, mockRunner);
- }
-
- @Test
- public void testStreamingOnCreateMatcher() throws Exception {
- options.setStreaming(true);
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getState()).thenReturn(State.DONE);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- options.as(TestPipelineOptions.class).setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
-
- when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
- .thenReturn(State.DONE);
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */
- ));
- runner.run(p, mockRunner);
- }
-
- @Test
- public void testBatchOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getState()).thenReturn(State.DONE);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
- runner.run(p, mockRunner);
- }
-
- /**
- * Tests that when a streaming pipeline terminates and doesn't fail due to {@link PAssert} that
- * the {@link TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) on success matcher} is
- * invoked.
- */
- @Test
- public void testStreamingOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
- options.setStreaming(true);
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getState()).thenReturn(State.DONE);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
-
- when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
- .thenReturn(State.DONE);
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
- runner.run(p, mockRunner);
- }
-
- @Test
- public void testBatchOnSuccessMatcherWhenPipelineFails() throws Exception {
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getState()).thenReturn(State.FAILED);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestFailureMatcher());
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */));
- try {
- runner.run(p, mockRunner);
- } catch (AssertionError expected) {
- verify(mockJob, Mockito.times(1)).waitUntilFinish(
- any(Duration.class), any(JobMessagesHandler.class));
- return;
- }
- fail("Expected an exception on pipeline failure.");
- }
-
- /**
- * Tests that when a streaming pipeline terminates in FAIL that the {@link
- * TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) on success matcher} is not
- * invoked.
- */
- @Test
- public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
- options.setStreaming(true);
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getState()).thenReturn(State.FAILED);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestFailureMatcher());
-
- when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
- .thenReturn(State.FAILED);
-
- runner.run(p, mockRunner);
- // If the onSuccessMatcher were invoked, it would have crashed here.
- }
-
- static class TestSuccessMatcher extends BaseMatcher<PipelineResult> implements
- SerializableMatcher<PipelineResult> {
- private final DataflowPipelineJob mockJob;
- private final int called;
-
- public TestSuccessMatcher(DataflowPipelineJob job, int times) {
- this.mockJob = job;
- this.called = times;
- }
-
- @Override
- public boolean matches(Object o) {
- if (!(o instanceof PipelineResult)) {
- fail(String.format("Expected PipelineResult but received %s", o));
- }
- try {
- verify(mockJob, Mockito.times(called)).waitUntilFinish(
- any(Duration.class), any(JobMessagesHandler.class));
- } catch (IOException | InterruptedException e) {
- throw new AssertionError(e);
- }
- assertSame(mockJob, o);
- return true;
- }
-
- @Override
- public void describeTo(Description description) {
- }
- }
-
- static class TestFailureMatcher extends BaseMatcher<PipelineResult> implements
- SerializableMatcher<PipelineResult> {
- @Override
- public boolean matches(Object o) {
- fail("OnSuccessMatcher should not be called on pipeline failure.");
- return false;
- }
-
- @Override
- public void describeTo(Description description) {
- }
- }
-}
[3/4] beam git commit: Move TestDataflowRunner into dataflow package
Posted by ke...@apache.org.
Move TestDataflowRunner into dataflow package
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b46e7b9b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b46e7b9b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b46e7b9b
Branch: refs/heads/master
Commit: b46e7b9bb113a5b1b75c7aca9de8cbabdd75ff6f
Parents: eb043d0
Author: Kenneth Knowles <kl...@google.com>
Authored: Sat May 6 05:22:03 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat May 6 13:49:52 2017 -0700
----------------------------------------------------------------------
.../dataflow/DataflowPipelineRegistrar.java | 1 -
.../dataflow/TestDataflowPipelineOptions.java | 28 +
.../runners/dataflow/TestDataflowRunner.java | 322 +++++++++
.../testing/TestDataflowPipelineOptions.java | 28 -
.../dataflow/testing/TestDataflowRunner.java | 325 ---------
.../runners/dataflow/testing/package-info.java | 24 -
.../runners/dataflow/DataflowMetricsTest.java | 1 -
.../dataflow/DataflowPipelineJobTest.java | 1 -
.../dataflow/TestDataflowRunnerTest.java | 652 ++++++++++++++++++
.../testing/TestDataflowRunnerTest.java | 655 -------------------
10 files changed, 1002 insertions(+), 1035 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
index f36930f..15855f9 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.dataflow;
import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableList;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.testing.TestDataflowRunner;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowPipelineOptions.java
new file mode 100644
index 0000000..a8acc76
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowPipelineOptions.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+/**
+ * A set of options used to configure the {@link TestPipeline}.
+ */
+public interface TestDataflowPipelineOptions extends TestPipelineOptions, DataflowPipelineOptions {
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
new file mode 100644
index 0000000..b81b487
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link TestDataflowRunner} is a pipeline runner that wraps a
+ * {@link DataflowRunner} when running tests against the {@link TestPipeline}.
+ *
+ * @see TestPipeline
+ */
+public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
+ private static final String TENTATIVE_COUNTER = "tentative";
+ private static final Logger LOG = LoggerFactory.getLogger(TestDataflowRunner.class);
+
+ private final TestDataflowPipelineOptions options;
+ private final DataflowClient dataflowClient;
+ private final DataflowRunner runner;
+ private int expectedNumberOfAssertions = 0;
+
+ TestDataflowRunner(TestDataflowPipelineOptions options, DataflowClient client) {
+ this.options = options;
+ this.dataflowClient = client;
+ this.runner = DataflowRunner.fromOptions(options);
+ }
+
+ /**
+ * Constructs a runner from the provided options.
+ */
+ public static TestDataflowRunner fromOptions(PipelineOptions options) {
+ TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class);
+ String tempLocation = Joiner.on("/").join(
+ dataflowOptions.getTempRoot(),
+ dataflowOptions.getJobName(),
+ "output",
+ "results");
+ dataflowOptions.setTempLocation(tempLocation);
+
+ return new TestDataflowRunner(
+ dataflowOptions, DataflowClient.create(options.as(DataflowPipelineOptions.class)));
+ }
+
+ @VisibleForTesting
+ static TestDataflowRunner fromOptionsAndClient(
+ TestDataflowPipelineOptions options, DataflowClient client) {
+ return new TestDataflowRunner(options, client);
+ }
+
+ @Override
+ public DataflowPipelineJob run(Pipeline pipeline) {
+ return run(pipeline, runner);
+ }
+
+ DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) {
+ updatePAssertCount(pipeline);
+
+ TestPipelineOptions testPipelineOptions = options.as(TestPipelineOptions.class);
+ final DataflowPipelineJob job;
+ job = runner.run(pipeline);
+
+ LOG.info("Running Dataflow job {} with {} expected assertions.",
+ job.getJobId(), expectedNumberOfAssertions);
+
+ assertThat(job, testPipelineOptions.getOnCreateMatcher());
+
+ final ErrorMonitorMessagesHandler messageHandler =
+ new ErrorMonitorMessagesHandler(job, new MonitoringUtil.LoggingHandler());
+
+ try {
+ Optional<Boolean> result = Optional.absent();
+
+ if (options.isStreaming()) {
+ // In streaming, there are infinite retries, so rather than timeout
+ // we try to terminate early by polling and canceling if we see
+ // an error message
+ while (true) {
+ State state = job.waitUntilFinish(Duration.standardSeconds(3), messageHandler);
+ if (state != null && state.isTerminal()) {
+ break;
+ }
+
+ if (messageHandler.hasSeenError()) {
+ if (!job.getState().isTerminal()) {
+ LOG.info("Cancelling Dataflow job {}", job.getJobId());
+ job.cancel();
+ }
+ break;
+ }
+ }
+
+ // Whether we canceled or not, this gets the final state of the job or times out
+ State finalState =
+ job.waitUntilFinish(
+ Duration.standardSeconds(options.getTestTimeoutSeconds()), messageHandler);
+
+ // Getting the final state timed out; it may not indicate a failure.
+ // This cancellation may be the second
+ if (finalState == null || finalState == State.RUNNING) {
+ LOG.info(
+ "Dataflow job {} took longer than {} seconds to complete, cancelling.",
+ job.getJobId(),
+ options.getTestTimeoutSeconds());
+ job.cancel();
+ }
+
+ if (messageHandler.hasSeenError()) {
+ result = Optional.of(false);
+ }
+ } else {
+ job.waitUntilFinish(Duration.standardSeconds(-1), messageHandler);
+ result = checkForPAssertSuccess(job);
+ }
+
+ if (!result.isPresent()) {
+ if (options.isStreaming()) {
+ LOG.warn(
+ "Dataflow job {} did not output a success or failure metric."
+ + " In rare situations, some PAsserts may not have run."
+ + " This is a known limitation of Dataflow in streaming.",
+ job.getJobId());
+ } else {
+ throw new IllegalStateException(
+ String.format(
+ "Dataflow job %s did not output a success or failure metric.", job.getJobId()));
+ }
+ } else if (!result.get()) {
+ throw new AssertionError(
+ Strings.isNullOrEmpty(messageHandler.getErrorMessage())
+ ? String.format(
+ "Dataflow job %s terminated in state %s but did not return a failure reason.",
+ job.getJobId(), job.getState())
+ : messageHandler.getErrorMessage());
+ } else {
+ assertThat(job, testPipelineOptions.getOnSuccessMatcher());
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return job;
+ }
+
+ @VisibleForTesting
+ void updatePAssertCount(Pipeline pipeline) {
+ if (DataflowRunner.hasExperiment(options, "beam_fn_api")) {
+ // TODO[BEAM-1866]: FnAPI does not support metrics, so expect 0 assertions.
+ expectedNumberOfAssertions = 0;
+ } else {
+ expectedNumberOfAssertions = PAssert.countAsserts(pipeline);
+ }
+ }
+
+ /**
+ * Check that PAssert expectations were met.
+ *
+ * <p>If the pipeline is not in a failed/cancelled state and no PAsserts were used within the
+ * pipeline, then this method will state that all PAsserts succeeded.
+ *
+ * @return Optional.of(false) if we are certain a PAssert or some other critical thing has failed,
+ * Optional.of(true) if we are certain all PAsserts passed, and Optional.absent() if the
+ * evidence is inconclusive.
+ */
+ @VisibleForTesting
+ Optional<Boolean> checkForPAssertSuccess(DataflowPipelineJob job) throws IOException {
+
+ // If the job failed, this is a definite failure. We only cancel jobs when they fail.
+ State state = job.getState();
+ if (state == State.FAILED || state == State.CANCELLED) {
+ LOG.info("Dataflow job {} terminated in failure state {}", job.getJobId(), state);
+ return Optional.of(false);
+ }
+
+ JobMetrics metrics = getJobMetrics(job);
+ if (metrics == null || metrics.getMetrics() == null) {
+ LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
+ return Optional.absent();
+ }
+
+ int successes = 0;
+ int failures = 0;
+ for (MetricUpdate metric : metrics.getMetrics()) {
+ if (metric.getName() == null
+ || metric.getName().getContext() == null
+ || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
+ // Don't double count using the non-tentative version of the metric.
+ continue;
+ }
+ if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
+ successes += ((BigDecimal) metric.getScalar()).intValue();
+ } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) {
+ failures += ((BigDecimal) metric.getScalar()).intValue();
+ }
+ }
+
+ if (failures > 0) {
+ LOG.info("Failure result for Dataflow job {}. Found {} success, {} failures out of "
+ + "{} expected assertions.", job.getJobId(), successes, failures,
+ expectedNumberOfAssertions);
+ return Optional.of(false);
+ } else if (successes >= expectedNumberOfAssertions) {
+ LOG.info(
+ "Success result for Dataflow job {}."
+ + " Found {} success, {} failures out of {} expected assertions.",
+ job.getJobId(),
+ successes,
+ failures,
+ expectedNumberOfAssertions);
+ return Optional.of(true);
+ }
+
+ LOG.info(
+ "Inconclusive results for Dataflow job {}."
+ + " Found {} success, {} failures out of {} expected assertions.",
+ job.getJobId(),
+ successes,
+ failures,
+ expectedNumberOfAssertions);
+ return Optional.absent();
+ }
+
+ @Nullable
+ @VisibleForTesting
+ JobMetrics getJobMetrics(DataflowPipelineJob job) {
+ JobMetrics metrics = null;
+ try {
+ metrics = dataflowClient.getJobMetrics(job.getJobId());
+ } catch (IOException e) {
+ LOG.warn("Failed to get job metrics: ", e);
+ }
+ return metrics;
+ }
+
+ @Override
+ public String toString() {
+ return "TestDataflowRunner#" + options.getAppName();
+ }
+
+ /**
+ * Monitors job log output messages for errors.
+ *
+ * <p>Creates an error message representing the concatenation of all error messages seen.
+ */
+ private static class ErrorMonitorMessagesHandler implements JobMessagesHandler {
+ private final DataflowPipelineJob job;
+ private final JobMessagesHandler messageHandler;
+ private final StringBuffer errorMessage;
+ private volatile boolean hasSeenError;
+
+ private ErrorMonitorMessagesHandler(
+ DataflowPipelineJob job, JobMessagesHandler messageHandler) {
+ this.job = job;
+ this.messageHandler = messageHandler;
+ this.errorMessage = new StringBuffer();
+ this.hasSeenError = false;
+ }
+
+ @Override
+ public void process(List<JobMessage> messages) {
+ messageHandler.process(messages);
+ for (JobMessage message : messages) {
+ if (message.getMessageImportance() != null
+ && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
+ LOG.info("Dataflow job {} threw exception. Failure message was: {}",
+ job.getJobId(), message.getMessageText());
+ errorMessage.append(message.getMessageText());
+ hasSeenError = true;
+ }
+ }
+ }
+
+ boolean hasSeenError() {
+ return hasSeenError;
+ }
+
+ String getErrorMessage() {
+ return errorMessage.toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java
deleted file mode 100644
index 12f7b39..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.testing;
-
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-
-/**
- * A set of options used to configure the {@link TestPipeline}.
- */
-public interface TestDataflowPipelineOptions extends TestPipelineOptions, DataflowPipelineOptions {
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
deleted file mode 100644
index ce91915..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.testing;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Strings;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.dataflow.DataflowClient;
-import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.PipelineRunner;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link TestDataflowRunner} is a pipeline runner that wraps a
- * {@link DataflowRunner} when running tests against the {@link TestPipeline}.
- *
- * @see TestPipeline
- */
-public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
- private static final String TENTATIVE_COUNTER = "tentative";
- private static final Logger LOG = LoggerFactory.getLogger(TestDataflowRunner.class);
-
- private final TestDataflowPipelineOptions options;
- private final DataflowClient dataflowClient;
- private final DataflowRunner runner;
- private int expectedNumberOfAssertions = 0;
-
- TestDataflowRunner(TestDataflowPipelineOptions options, DataflowClient client) {
- this.options = options;
- this.dataflowClient = client;
- this.runner = DataflowRunner.fromOptions(options);
- }
-
- /**
- * Constructs a runner from the provided options.
- */
- public static TestDataflowRunner fromOptions(PipelineOptions options) {
- TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class);
- String tempLocation = Joiner.on("/").join(
- dataflowOptions.getTempRoot(),
- dataflowOptions.getJobName(),
- "output",
- "results");
- dataflowOptions.setTempLocation(tempLocation);
-
- return new TestDataflowRunner(
- dataflowOptions, DataflowClient.create(options.as(DataflowPipelineOptions.class)));
- }
-
- @VisibleForTesting
- static TestDataflowRunner fromOptionsAndClient(
- TestDataflowPipelineOptions options, DataflowClient client) {
- return new TestDataflowRunner(options, client);
- }
-
- @Override
- public DataflowPipelineJob run(Pipeline pipeline) {
- return run(pipeline, runner);
- }
-
- DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) {
- updatePAssertCount(pipeline);
-
- TestPipelineOptions testPipelineOptions = options.as(TestPipelineOptions.class);
- final DataflowPipelineJob job;
- job = runner.run(pipeline);
-
- LOG.info("Running Dataflow job {} with {} expected assertions.",
- job.getJobId(), expectedNumberOfAssertions);
-
- assertThat(job, testPipelineOptions.getOnCreateMatcher());
-
- final ErrorMonitorMessagesHandler messageHandler =
- new ErrorMonitorMessagesHandler(job, new MonitoringUtil.LoggingHandler());
-
- try {
- Optional<Boolean> result = Optional.absent();
-
- if (options.isStreaming()) {
- // In streaming, there are infinite retries, so rather than timeout
- // we try to terminate early by polling and canceling if we see
- // an error message
- while (true) {
- State state = job.waitUntilFinish(Duration.standardSeconds(3), messageHandler);
- if (state != null && state.isTerminal()) {
- break;
- }
-
- if (messageHandler.hasSeenError()) {
- if (!job.getState().isTerminal()) {
- LOG.info("Cancelling Dataflow job {}", job.getJobId());
- job.cancel();
- }
- break;
- }
- }
-
- // Whether we canceled or not, this gets the final state of the job or times out
- State finalState =
- job.waitUntilFinish(
- Duration.standardSeconds(options.getTestTimeoutSeconds()), messageHandler);
-
- // Getting the final state timed out; it may not indicate a failure.
- // This cancellation may be the second
- if (finalState == null || finalState == State.RUNNING) {
- LOG.info(
- "Dataflow job {} took longer than {} seconds to complete, cancelling.",
- job.getJobId(),
- options.getTestTimeoutSeconds());
- job.cancel();
- }
-
- if (messageHandler.hasSeenError()) {
- result = Optional.of(false);
- }
- } else {
- job.waitUntilFinish(Duration.standardSeconds(-1), messageHandler);
- result = checkForPAssertSuccess(job);
- }
-
- if (!result.isPresent()) {
- if (options.isStreaming()) {
- LOG.warn(
- "Dataflow job {} did not output a success or failure metric."
- + " In rare situations, some PAsserts may not have run."
- + " This is a known limitation of Dataflow in streaming.",
- job.getJobId());
- } else {
- throw new IllegalStateException(
- String.format(
- "Dataflow job %s did not output a success or failure metric.", job.getJobId()));
- }
- } else if (!result.get()) {
- throw new AssertionError(
- Strings.isNullOrEmpty(messageHandler.getErrorMessage())
- ? String.format(
- "Dataflow job %s terminated in state %s but did not return a failure reason.",
- job.getJobId(), job.getState())
- : messageHandler.getErrorMessage());
- } else {
- assertThat(job, testPipelineOptions.getOnSuccessMatcher());
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return job;
- }
-
- @VisibleForTesting
- void updatePAssertCount(Pipeline pipeline) {
- if (DataflowRunner.hasExperiment(options, "beam_fn_api")) {
- // TODO[BEAM-1866]: FnAPI does not support metrics, so expect 0 assertions.
- expectedNumberOfAssertions = 0;
- } else {
- expectedNumberOfAssertions = PAssert.countAsserts(pipeline);
- }
- }
-
- /**
- * Check that PAssert expectations were met.
- *
- * <p>If the pipeline is not in a failed/cancelled state and no PAsserts were used within the
- * pipeline, then this method will state that all PAsserts succeeded.
- *
- * @return Optional.of(false) if we are certain a PAssert or some other critical thing has failed,
- * Optional.of(true) if we are certain all PAsserts passed, and Optional.absent() if the
- * evidence is inconclusive.
- */
- @VisibleForTesting
- Optional<Boolean> checkForPAssertSuccess(DataflowPipelineJob job) throws IOException {
-
- // If the job failed, this is a definite failure. We only cancel jobs when they fail.
- State state = job.getState();
- if (state == State.FAILED || state == State.CANCELLED) {
- LOG.info("Dataflow job {} terminated in failure state {}", job.getJobId(), state);
- return Optional.of(false);
- }
-
- JobMetrics metrics = getJobMetrics(job);
- if (metrics == null || metrics.getMetrics() == null) {
- LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
- return Optional.absent();
- }
-
- int successes = 0;
- int failures = 0;
- for (MetricUpdate metric : metrics.getMetrics()) {
- if (metric.getName() == null
- || metric.getName().getContext() == null
- || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
- // Don't double count using the non-tentative version of the metric.
- continue;
- }
- if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
- successes += ((BigDecimal) metric.getScalar()).intValue();
- } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) {
- failures += ((BigDecimal) metric.getScalar()).intValue();
- }
- }
-
- if (failures > 0) {
- LOG.info("Failure result for Dataflow job {}. Found {} success, {} failures out of "
- + "{} expected assertions.", job.getJobId(), successes, failures,
- expectedNumberOfAssertions);
- return Optional.of(false);
- } else if (successes >= expectedNumberOfAssertions) {
- LOG.info(
- "Success result for Dataflow job {}."
- + " Found {} success, {} failures out of {} expected assertions.",
- job.getJobId(),
- successes,
- failures,
- expectedNumberOfAssertions);
- return Optional.of(true);
- }
-
- LOG.info(
- "Inconclusive results for Dataflow job {}."
- + " Found {} success, {} failures out of {} expected assertions.",
- job.getJobId(),
- successes,
- failures,
- expectedNumberOfAssertions);
- return Optional.absent();
- }
-
- @Nullable
- @VisibleForTesting
- JobMetrics getJobMetrics(DataflowPipelineJob job) {
- JobMetrics metrics = null;
- try {
- metrics = dataflowClient.getJobMetrics(job.getJobId());
- } catch (IOException e) {
- LOG.warn("Failed to get job metrics: ", e);
- }
- return metrics;
- }
-
- @Override
- public String toString() {
- return "TestDataflowRunner#" + options.getAppName();
- }
-
- /**
- * Monitors job log output messages for errors.
- *
- * <p>Creates an error message representing the concatenation of all error messages seen.
- */
- private static class ErrorMonitorMessagesHandler implements JobMessagesHandler {
- private final DataflowPipelineJob job;
- private final JobMessagesHandler messageHandler;
- private final StringBuffer errorMessage;
- private volatile boolean hasSeenError;
-
- private ErrorMonitorMessagesHandler(
- DataflowPipelineJob job, JobMessagesHandler messageHandler) {
- this.job = job;
- this.messageHandler = messageHandler;
- this.errorMessage = new StringBuffer();
- this.hasSeenError = false;
- }
-
- @Override
- public void process(List<JobMessage> messages) {
- messageHandler.process(messages);
- for (JobMessage message : messages) {
- if (message.getMessageImportance() != null
- && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
- LOG.info("Dataflow job {} threw exception. Failure message was: {}",
- job.getJobId(), message.getMessageText());
- errorMessage.append(message.getMessageText());
- hasSeenError = true;
- }
- }
- }
-
- boolean hasSeenError() {
- return hasSeenError;
- }
-
- String getErrorMessage() {
- return errorMessage.toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/package-info.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/package-info.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/package-info.java
deleted file mode 100644
index 9683df0..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Provides utilities for integration testing and {@link
- * org.apache.beam.sdk.testing.ValidatesRunner} tests of the Google Cloud Dataflow
- * runner.
- */
-package org.apache.beam.runners.dataflow.testing;
http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
index aabdd84..7e88300 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
@@ -37,7 +37,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.math.BigDecimal;
-import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.metrics.MetricQueryResults;
http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index f868a17..df894d2 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -48,7 +48,6 @@ import java.util.List;
import java.util.NavigableMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.runners.dataflow.util.TimeUtil;
import org.apache.beam.sdk.Pipeline;
http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
new file mode 100644
index 0000000..bf15747
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
@@ -0,0 +1,652 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricStructuredName;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
+import org.apache.beam.runners.dataflow.util.TimeUtil;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SerializableMatcher;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.NoopPathValidator;
+import org.apache.beam.sdk.util.Transport;
+import org.apache.beam.sdk.values.PCollection;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/** Tests for {@link TestDataflowRunner}. */
+@RunWith(JUnit4.class)
+public class TestDataflowRunnerTest {
+ @Rule public ExpectedException expectedException = ExpectedException.none();
+ @Mock private DataflowClient mockClient;
+
+ private TestDataflowPipelineOptions options;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+
+ options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
+ options.setAppName("TestAppName");
+ options.setProject("test-project");
+ options.setTempLocation("gs://test/temp/location");
+ options.setTempRoot("gs://test");
+ options.setGcpCredential(new TestCredential());
+ options.setRunner(TestDataflowRunner.class);
+ options.setPathValidatorClass(NoopPathValidator.class);
+ }
+
+ @Test
+ public void testToString() {
+ assertEquals("TestDataflowRunner#TestAppName",
+ TestDataflowRunner.fromOptions(options).toString());
+ }
+
+ @Test
+ public void testRunBatchJobThatSucceeds() throws Exception {
+ Pipeline p = Pipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getState()).thenReturn(State.DONE);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+ when(mockClient.getJobMetrics(anyString()))
+ .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
+ assertEquals(mockJob, runner.run(p, mockRunner));
+ }
+
+ @Test
+ public void testRunBatchJobThatFails() throws Exception {
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getState()).thenReturn(State.FAILED);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+ when(mockClient.getJobMetrics(anyString()))
+ .thenReturn(generateMockMetricResponse(false /* success */, false /* tentative */));
+ try {
+ runner.run(p, mockRunner);
+ } catch (AssertionError expected) {
+ return;
+ }
+ // Note that fail throws an AssertionError which is why it is placed out here
+ // instead of inside the try-catch block.
+ fail("AssertionError expected");
+ }
+
+ @Test
+ public void testBatchPipelineFailsIfException() throws Exception {
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getState()).thenReturn(State.RUNNING);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+ when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
+ .thenAnswer(new Answer<State>() {
+ @Override
+ public State answer(InvocationOnMock invocation) {
+ JobMessage message = new JobMessage();
+ message.setMessageText("FooException");
+ message.setTime(TimeUtil.toCloudTime(Instant.now()));
+ message.setMessageImportance("JOB_MESSAGE_ERROR");
+ ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[1])
+ .process(Arrays.asList(message));
+ return State.CANCELLED;
+ }
+ });
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ when(mockClient.getJobMetrics(anyString()))
+ .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */));
+ TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+ try {
+ runner.run(p, mockRunner);
+ } catch (AssertionError expected) {
+ assertThat(expected.getMessage(), containsString("FooException"));
+ verify(mockJob, never()).cancel();
+ return;
+ }
+ // Note that fail throws an AssertionError which is why it is placed out here
+ // instead of inside the try-catch block.
+ fail("AssertionError expected");
+ }
+
+ /**
+ * A streaming job that terminates with no error messages is a success.
+ */
+ @Test
+ public void testRunStreamingJobUsingPAssertThatSucceeds() throws Exception {
+ options.setStreaming(true);
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getState()).thenReturn(State.DONE);
+ when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
+ .thenReturn(State.DONE);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ when(mockClient.getJobMetrics(anyString()))
+ .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
+ TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+ runner.run(p, mockRunner);
+ }
+
+ @Test
+ public void testRunStreamingJobNotUsingPAssertThatSucceeds() throws Exception {
+ options.setStreaming(true);
+ Pipeline p = TestPipeline.create(options);
+ p.apply(Create.of(1, 2, 3));
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getState()).thenReturn(State.DONE);
+ when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
+ .thenReturn(State.DONE);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ when(mockClient.getJobMetrics(anyString()))
+ .thenReturn(generateMockStreamingMetricResponse(ImmutableMap.<String, BigDecimal>of()));
+ TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+ runner.run(p, mockRunner);
+ }
+
+ /**
+ * Tests that a streaming job with a false {@link PAssert} fails.
+ *
+ * <p>Currently, this failure is indistinguishable from a non-{@link PAssert} failure, because it
+ * is detected only by failure job messages. With fuller metric support, this can detect a PAssert
+ * failure via metrics and raise an {@link AssertionError} in just that case.
+ */
+ @Test
+ public void testRunStreamingJobThatFails() throws Exception {
+ testStreamingPipelineFailsIfException();
+ }
+
+ private JobMetrics generateMockMetricResponse(boolean success, boolean tentative)
+ throws Exception {
+ List<MetricUpdate> metrics = generateMockMetrics(success, tentative);
+ return buildJobMetrics(metrics);
+ }
+
+ private List<MetricUpdate> generateMockMetrics(boolean success, boolean tentative) {
+ MetricStructuredName name = new MetricStructuredName();
+ name.setName(success ? "PAssertSuccess" : "PAssertFailure");
+ name.setContext(
+ tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.<String, String>of());
+
+ MetricUpdate metric = new MetricUpdate();
+ metric.setName(name);
+ metric.setScalar(BigDecimal.ONE);
+ return Lists.newArrayList(metric);
+ }
+
+ private JobMetrics generateMockStreamingMetricResponse(Map<String,
+ BigDecimal> metricMap) throws IOException {
+ return buildJobMetrics(generateMockStreamingMetrics(metricMap));
+ }
+
+ private List<MetricUpdate> generateMockStreamingMetrics(Map<String, BigDecimal> metricMap) {
+ List<MetricUpdate> metrics = Lists.newArrayList();
+ for (Map.Entry<String, BigDecimal> entry : metricMap.entrySet()) {
+ MetricStructuredName name = new MetricStructuredName();
+ name.setName(entry.getKey());
+
+ MetricUpdate metric = new MetricUpdate();
+ metric.setName(name);
+ metric.setScalar(entry.getValue());
+ metrics.add(metric);
+ }
+ return metrics;
+ }
+
+ private JobMetrics buildJobMetrics(List<MetricUpdate> metricList) {
+ JobMetrics jobMetrics = new JobMetrics();
+ jobMetrics.setMetrics(metricList);
+ // N.B. Setting the factory is necessary in order to get valid JSON.
+ jobMetrics.setFactory(Transport.getJsonFactory());
+ return jobMetrics;
+ }
+
+ /**
+ * Tests that a tentative {@code true} from metrics indicates that every {@link PAssert} has
+ * succeeded.
+ */
+ @Test
+ public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
+ DataflowPipelineJob job =
+ spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ when(mockClient.getJobMetrics(anyString()))
+ .thenReturn(buildJobMetrics(generateMockMetrics(true /* success */, true /* tentative */)));
+
+ TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+ doReturn(State.DONE).when(job).getState();
+ assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(true)));
+ }
+
+ /**
+ * Tests that when we just see a tentative failure for a {@link PAssert} it is considered a
+ * conclusive failure.
+ */
+ @Test
+ public void testCheckingForSuccessWhenPAssertFails() throws Exception {
+ DataflowPipelineJob job =
+ spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ when(mockClient.getJobMetrics(anyString()))
+ .thenReturn(
+ buildJobMetrics(generateMockMetrics(false /* success */, true /* tentative */)));
+
+ TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+ doReturn(State.DONE).when(job).getState();
+ assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(false)));
+ }
+
+ @Test
+ public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
+ DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ when(mockClient.getJobMetrics(anyString()))
+ .thenReturn(
+ buildJobMetrics(generateMockMetrics(true /* success */, false /* tentative */)));
+
+ TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+ runner.updatePAssertCount(p);
+ doReturn(State.RUNNING).when(job).getState();
+ assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.<Boolean>absent()));
+ }
+
+ /**
+ * Tests that if a streaming pipeline terminates with FAIL that the check for PAssert
+ * success is a conclusive failure.
+ */
+ @Test
+ public void testStreamingPipelineFailsIfServiceFails() throws Exception {
+ DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+ doReturn(State.FAILED).when(job).getState();
+ assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(false)));
+ }
+
+ /**
+ * Tests that if a streaming pipeline crash loops for a non-assertion reason that the test run
+ * throws an {@link AssertionError}.
+ *
+ * <p>This is a known limitation/bug of the runner that it does not distinguish the two modes of
+ * failure.
+ */
+ @Test
+ public void testStreamingPipelineFailsIfException() throws Exception {
+ options.setStreaming(true);
+ Pipeline pipeline = TestPipeline.create(options);
+ PCollection<Integer> pc = pipeline.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getState()).thenReturn(State.RUNNING);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+ when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
+ .thenAnswer(new Answer<State>() {
+ @Override
+ public State answer(InvocationOnMock invocation) {
+ JobMessage message = new JobMessage();
+ message.setMessageText("FooException");
+ message.setTime(TimeUtil.toCloudTime(Instant.now()));
+ message.setMessageImportance("JOB_MESSAGE_ERROR");
+ ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[1])
+ .process(Arrays.asList(message));
+ return State.CANCELLED;
+ }
+ });
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ when(mockClient.getJobMetrics(anyString()))
+ .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */));
+ TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+
+ try {
+ runner.run(pipeline, mockRunner);
+ } catch (AssertionError exc) {
+ return;
+ }
+ fail("AssertionError expected");
+ }
+
+ @Test
+ public void testGetJobMetricsThatSucceeds() throws Exception {
+ DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
+ Pipeline p = TestPipeline.create(options);
+ p.apply(Create.of(1, 2, 3));
+
+ when(mockClient.getJobMetrics(anyString()))
+ .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
+ TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+ JobMetrics metrics = runner.getJobMetrics(job);
+
+ assertEquals(1, metrics.getMetrics().size());
+ assertEquals(generateMockMetrics(true /* success */, true /* tentative */),
+ metrics.getMetrics());
+ }
+
+ @Test
+ public void testGetJobMetricsThatFailsForException() throws Exception {
+ DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
+ Pipeline p = TestPipeline.create(options);
+ p.apply(Create.of(1, 2, 3));
+
+ when(mockClient.getJobMetrics(anyString())).thenThrow(new IOException());
+ TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+ assertNull(runner.getJobMetrics(job));
+ }
+
+ @Test
+ public void testBatchOnCreateMatcher() throws Exception {
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getState()).thenReturn(State.DONE);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+ options.as(TestPipelineOptions.class).setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
+
+ when(mockClient.getJobMetrics(anyString()))
+ .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
+ runner.run(p, mockRunner);
+ }
+
+ @Test
+ public void testStreamingOnCreateMatcher() throws Exception {
+ options.setStreaming(true);
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getState()).thenReturn(State.DONE);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+ options.as(TestPipelineOptions.class).setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
+
+ when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
+ .thenReturn(State.DONE);
+
+ when(mockClient.getJobMetrics(anyString()))
+ .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */
+ ));
+ runner.run(p, mockRunner);
+ }
+
+ @Test
+ public void testBatchOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getState()).thenReturn(State.DONE);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+ options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
+
+ when(mockClient.getJobMetrics(anyString()))
+ .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
+ runner.run(p, mockRunner);
+ }
+
+ /**
+ * Tests that when a streaming pipeline terminates and doesn't fail due to {@link PAssert} that
+ * the {@link TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) on success matcher} is
+ * invoked.
+ */
+ @Test
+ public void testStreamingOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
+ options.setStreaming(true);
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getState()).thenReturn(State.DONE);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+ options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
+
+ when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
+ .thenReturn(State.DONE);
+
+ when(mockClient.getJobMetrics(anyString()))
+ .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
+ runner.run(p, mockRunner);
+ }
+
+ @Test
+ public void testBatchOnSuccessMatcherWhenPipelineFails() throws Exception {
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getState()).thenReturn(State.FAILED);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+ options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestFailureMatcher());
+
+ when(mockClient.getJobMetrics(anyString()))
+ .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */));
+ try {
+ runner.run(p, mockRunner);
+ } catch (AssertionError expected) {
+ verify(mockJob, Mockito.times(1)).waitUntilFinish(
+ any(Duration.class), any(JobMessagesHandler.class));
+ return;
+ }
+ fail("Expected an exception on pipeline failure.");
+ }
+
+ /**
+ * Tests that when a streaming pipeline terminates in FAIL that the {@link
+ * TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) on success matcher} is not
+ * invoked.
+ */
+ @Test
+ public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
+ options.setStreaming(true);
+ Pipeline p = TestPipeline.create(options);
+ PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+ PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+ final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+ when(mockJob.getState()).thenReturn(State.FAILED);
+ when(mockJob.getProjectId()).thenReturn("test-project");
+ when(mockJob.getJobId()).thenReturn("test-job");
+
+ DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+ when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+ TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+ options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestFailureMatcher());
+
+ when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
+ .thenReturn(State.FAILED);
+
+ runner.run(p, mockRunner);
+ // If the onSuccessMatcher were invoked, it would have crashed here.
+ }
+
+ static class TestSuccessMatcher extends BaseMatcher<PipelineResult> implements
+ SerializableMatcher<PipelineResult> {
+ private final DataflowPipelineJob mockJob;
+ private final int called;
+
+ public TestSuccessMatcher(DataflowPipelineJob job, int times) {
+ this.mockJob = job;
+ this.called = times;
+ }
+
+ @Override
+ public boolean matches(Object o) {
+ if (!(o instanceof PipelineResult)) {
+ fail(String.format("Expected PipelineResult but received %s", o));
+ }
+ try {
+ verify(mockJob, Mockito.times(called)).waitUntilFinish(
+ any(Duration.class), any(JobMessagesHandler.class));
+ } catch (IOException | InterruptedException e) {
+ throw new AssertionError(e);
+ }
+ assertSame(mockJob, o);
+ return true;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ }
+ }
+
+ static class TestFailureMatcher extends BaseMatcher<PipelineResult> implements
+ SerializableMatcher<PipelineResult> {
+ @Override
+ public boolean matches(Object o) {
+ fail("OnSuccessMatcher should not be called on pipeline failure.");
+ return false;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ }
+ }
+}
[4/4] beam git commit: This closes #2940: Move TestDataflowRunner
into dataflow package
Posted by ke...@apache.org.
This closes #2940: Move TestDataflowRunner into dataflow package
Move TestDataflowRunner into dataflow package
Register the TestDataflowRunner via registrar
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/53dd0a52
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/53dd0a52
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/53dd0a52
Branch: refs/heads/master
Commit: 53dd0a529a3570d4a6edef4f3e59938403817df4
Parents: dabad1a b46e7b9
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun May 7 16:11:41 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun May 7 16:11:41 2017 -0700
----------------------------------------------------------------------
...job_beam_PostCommit_Java_MavenInstall.groovy | 2 +-
...tCommit_Java_ValidatesRunner_Dataflow.groovy | 2 +-
.../job_beam_Release_NightlySnapshot.groovy | 2 +-
examples/java/pom.xml | 4 +-
.../dataflow/DataflowPipelineRegistrar.java | 2 +-
.../dataflow/TestDataflowPipelineOptions.java | 28 +
.../runners/dataflow/TestDataflowRunner.java | 322 +++++++++
.../testing/TestDataflowPipelineOptions.java | 28 -
.../dataflow/testing/TestDataflowRunner.java | 325 ---------
.../runners/dataflow/testing/package-info.java | 24 -
.../runners/dataflow/DataflowMetricsTest.java | 1 -
.../dataflow/DataflowPipelineJobTest.java | 1 -
.../dataflow/DataflowPipelineRegistrarTest.java | 2 +-
.../dataflow/TestDataflowRunnerTest.java | 652 ++++++++++++++++++
.../testing/TestDataflowRunnerTest.java | 655 -------------------
.../apache/beam/sdk/testing/TestPipeline.java | 2 +-
16 files changed, 1010 insertions(+), 1042 deletions(-)
----------------------------------------------------------------------