You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/07 23:12:17 UTC

[1/4] beam git commit: Register the TestDataflowRunner via registrar

Repository: beam
Updated Branches:
  refs/heads/master dabad1ae5 -> 53dd0a529


Register the TestDataflowRunner via registrar


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eb043d06
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eb043d06
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eb043d06

Branch: refs/heads/master
Commit: eb043d06df0da4a47d16047f1f050e41114ac424
Parents: 019d300
Author: Kenneth Knowles <kl...@google.com>
Authored: Sat May 6 05:13:34 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat May 6 13:49:51 2017 -0700

----------------------------------------------------------------------
 .test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall.groovy | 2 +-
 .../job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy     | 2 +-
 .test-infra/jenkins/job_beam_Release_NightlySnapshot.groovy      | 2 +-
 examples/java/pom.xml                                            | 4 ++--
 .../apache/beam/runners/dataflow/DataflowPipelineRegistrar.java  | 3 ++-
 .../beam/runners/dataflow/DataflowPipelineRegistrarTest.java     | 2 +-
 .../src/main/java/org/apache/beam/sdk/testing/TestPipeline.java  | 2 +-
 7 files changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/eb043d06/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall.groovy
----------------------------------------------------------------------
diff --git a/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall.groovy b/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall.groovy
index 51aedc3..2f05c38 100644
--- a/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall.groovy
+++ b/.test-infra/jenkins/job_beam_PostCommit_Java_MavenInstall.groovy
@@ -44,5 +44,5 @@ mavenJob('beam_PostCommit_Java_MavenInstall') {
           'Run Java PostCommit')
 
   // Maven goals for this job.
-  goals('-B -e -P release,dataflow-runner clean install coveralls:report -DrepoToken=$COVERALLS_REPO_TOKEN -DskipITs=false -DintegrationTestPipelineOptions=\'[ "--project=apache-beam-testing", "--tempRoot=gs://temp-storage-for-end-to-end-tests", "--runner=org.apache.beam.runners.dataflow.testing.TestDataflowRunner" ]\'')
+  goals('-B -e -P release,dataflow-runner clean install coveralls:report -DrepoToken=$COVERALLS_REPO_TOKEN -DskipITs=false -DintegrationTestPipelineOptions=\'[ "--project=apache-beam-testing", "--tempRoot=gs://temp-storage-for-end-to-end-tests", "--runner=TestDataflowRunner" ]\'')
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/eb043d06/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy
----------------------------------------------------------------------
diff --git a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy
index 33235ff..2c739e3 100644
--- a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy
+++ b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy
@@ -41,5 +41,5 @@ mavenJob('beam_PostCommit_Java_ValidatesRunner_Dataflow') {
     'Run Dataflow ValidatesRunner')
 
   // Maven goals for this job.
-  goals('-B -e clean verify -am -pl runners/google-cloud-dataflow-java -DforkCount=0 -DvalidatesRunnerPipelineOptions=\'[ "--runner=org.apache.beam.runners.dataflow.testing.TestDataflowRunner", "--project=apache-beam-testing", "--tempRoot=gs://temp-storage-for-validates-runner-tests/" ]\'')
+  goals('-B -e clean verify -am -pl runners/google-cloud-dataflow-java -DforkCount=0 -DvalidatesRunnerPipelineOptions=\'[ "--runner=TestDataflowRunner", "--project=apache-beam-testing", "--tempRoot=gs://temp-storage-for-validates-runner-tests/" ]\'')
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/eb043d06/.test-infra/jenkins/job_beam_Release_NightlySnapshot.groovy
----------------------------------------------------------------------
diff --git a/.test-infra/jenkins/job_beam_Release_NightlySnapshot.groovy b/.test-infra/jenkins/job_beam_Release_NightlySnapshot.groovy
index f2c3ff0..7284acd 100644
--- a/.test-infra/jenkins/job_beam_Release_NightlySnapshot.groovy
+++ b/.test-infra/jenkins/job_beam_Release_NightlySnapshot.groovy
@@ -41,5 +41,5 @@ mavenJob('beam_Release_NightlySnapshot') {
       'dev@beam.apache.org')
 
   // Maven goals for this job.
-  goals('-B -e clean deploy -P release,dataflow-runner -DskipITs=false -DintegrationTestPipelineOptions=\'[ "--project=apache-beam-testing", "--tempRoot=gs://temp-storage-for-end-to-end-tests", "--runner=org.apache.beam.runners.dataflow.testing.TestDataflowRunner" ]\'')
+  goals('-B -e clean deploy -P release,dataflow-runner -DskipITs=false -DintegrationTestPipelineOptions=\'[ "--project=apache-beam-testing", "--tempRoot=gs://temp-storage-for-end-to-end-tests", "--runner=TestDataflowRunner" ]\'')
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/eb043d06/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index a6ef5d1..21096f7 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -263,7 +263,7 @@
                       [
                       "--project=apache-beam-testing",
                       "--tempRoot=gs://temp-storage-for-end-to-end-tests",
-                      "--runner=org.apache.beam.runners.dataflow.testing.TestDataflowRunner"
+                      "--runner=TestDataflowRunner"
                       ]
                     </beamTestPipelineOptions>
                   </systemPropertyVariables>
@@ -287,7 +287,7 @@
                       [
                       "--project=apache-beam-testing",
                       "--tempRoot=gs://temp-storage-for-end-to-end-tests",
-                      "--runner=org.apache.beam.runners.dataflow.testing.TestDataflowRunner",
+                      "--runner=TestDataflowRunner",
                       "--streaming=true"
                       ]
                     </beamTestPipelineOptions>

http://git-wip-us.apache.org/repos/asf/beam/blob/eb043d06/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
index b6802bb..f36930f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.dataflow;
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableList;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.testing.TestDataflowRunner;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
@@ -52,7 +53,7 @@ public class DataflowPipelineRegistrar {
     @Override
     public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
       return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
-          DataflowRunner.class);
+          DataflowRunner.class, TestDataflowRunner.class);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/eb043d06/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java
index 31f9281..728fc71 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java
@@ -41,7 +41,7 @@ public class DataflowPipelineRegistrarTest {
 
   @Test
   public void testCorrectRunnersAreReturned() {
-    assertEquals(ImmutableList.of(DataflowRunner.class),
+    assertEquals(ImmutableList.of(DataflowRunner.class, TestDataflowRunner.class),
         new DataflowPipelineRegistrar.Runner().getPipelineRunners());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/eb043d06/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 96cae51..e04c2f8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -72,7 +72,7 @@ import org.junit.runners.model.Statement;
  * <li>System property "beamTestPipelineOptions" must contain a JSON delimited list of pipeline
  *     options. For example:
  *     <pre>{@code [
- *     "--runner=org.apache.beam.runners.dataflow.testing.TestDataflowRunner",
+ *     "--runner=TestDataflowRunner",
  *     "--project=mygcpproject",
  *     "--stagingLocation=gs://mygcsbucket/path"
  *     ]}</pre>


[2/4] beam git commit: Move TestDataflowRunner into dataflow package

Posted by ke...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
deleted file mode 100644
index eb068e6..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ /dev/null
@@ -1,655 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.testing;
-
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricStructuredName;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.dataflow.DataflowClient;
-import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
-import org.apache.beam.runners.dataflow.util.TimeUtil;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.SerializableMatcher;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.util.NoopPathValidator;
-import org.apache.beam.sdk.util.Transport;
-import org.apache.beam.sdk.values.PCollection;
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/** Tests for {@link TestDataflowRunner}. */
-@RunWith(JUnit4.class)
-public class TestDataflowRunnerTest {
-  @Rule public ExpectedException expectedException = ExpectedException.none();
-  @Mock private DataflowClient mockClient;
-
-  private TestDataflowPipelineOptions options;
-
-  @Before
-  public void setUp() throws Exception {
-    MockitoAnnotations.initMocks(this);
-
-    options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
-    options.setAppName("TestAppName");
-    options.setProject("test-project");
-    options.setTempLocation("gs://test/temp/location");
-    options.setTempRoot("gs://test");
-    options.setGcpCredential(new TestCredential());
-    options.setRunner(TestDataflowRunner.class);
-    options.setPathValidatorClass(NoopPathValidator.class);
-  }
-
-  @Test
-  public void testToString() {
-    assertEquals("TestDataflowRunner#TestAppName",
-        TestDataflowRunner.fromOptions(options).toString());
-  }
-
-  @Test
-  public void testRunBatchJobThatSucceeds() throws Exception {
-    Pipeline p = Pipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.DONE);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
-    assertEquals(mockJob, runner.run(p, mockRunner));
-  }
-
-  @Test
-  public void testRunBatchJobThatFails() throws Exception {
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.FAILED);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockMetricResponse(false /* success */, false /* tentative */));
-    try {
-      runner.run(p, mockRunner);
-    } catch (AssertionError expected) {
-      return;
-    }
-    // Note that fail throws an AssertionError which is why it is placed out here
-    // instead of inside the try-catch block.
-    fail("AssertionError expected");
-  }
-
-  @Test
-  public void testBatchPipelineFailsIfException() throws Exception {
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.RUNNING);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
-        .thenAnswer(new Answer<State>() {
-          @Override
-          public State answer(InvocationOnMock invocation) {
-            JobMessage message = new JobMessage();
-            message.setMessageText("FooException");
-            message.setTime(TimeUtil.toCloudTime(Instant.now()));
-            message.setMessageImportance("JOB_MESSAGE_ERROR");
-            ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[1])
-                .process(Arrays.asList(message));
-            return State.CANCELLED;
-          }
-        });
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */));
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    try {
-      runner.run(p, mockRunner);
-    } catch (AssertionError expected) {
-      assertThat(expected.getMessage(), containsString("FooException"));
-      verify(mockJob, never()).cancel();
-      return;
-    }
-    // Note that fail throws an AssertionError which is why it is placed out here
-    // instead of inside the try-catch block.
-    fail("AssertionError expected");
-  }
-
-  /**
-   * A streaming job that terminates with no error messages is a success.
-   */
-  @Test
-  public void testRunStreamingJobUsingPAssertThatSucceeds() throws Exception {
-    options.setStreaming(true);
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.DONE);
-    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
-        .thenReturn(State.DONE);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    runner.run(p, mockRunner);
-  }
-
-  @Test
-  public void testRunStreamingJobNotUsingPAssertThatSucceeds() throws Exception {
-    options.setStreaming(true);
-    Pipeline p = TestPipeline.create(options);
-    p.apply(Create.of(1, 2, 3));
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.DONE);
-    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
-        .thenReturn(State.DONE);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockStreamingMetricResponse(ImmutableMap.<String, BigDecimal>of()));
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    runner.run(p, mockRunner);
-  }
-
-  /**
-   * Tests that a streaming job with a false {@link PAssert} fails.
-   *
-   * <p>Currently, this failure is indistinguishable from a non-{@link PAssert} failure, because it
-   * is detected only by failure job messages. With fuller metric support, this can detect a PAssert
-   * failure via metrics and raise an {@link AssertionError} in just that case.
-   */
-  @Test
-  public void testRunStreamingJobThatFails() throws Exception {
-    testStreamingPipelineFailsIfException();
-  }
-
-  private JobMetrics generateMockMetricResponse(boolean success, boolean tentative)
-      throws Exception {
-    List<MetricUpdate> metrics = generateMockMetrics(success, tentative);
-    return buildJobMetrics(metrics);
-  }
-
-  private List<MetricUpdate> generateMockMetrics(boolean success, boolean tentative) {
-    MetricStructuredName name = new MetricStructuredName();
-    name.setName(success ? "PAssertSuccess" : "PAssertFailure");
-    name.setContext(
-        tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.<String, String>of());
-
-    MetricUpdate metric = new MetricUpdate();
-    metric.setName(name);
-    metric.setScalar(BigDecimal.ONE);
-    return Lists.newArrayList(metric);
-  }
-
-  private JobMetrics generateMockStreamingMetricResponse(Map<String,
-      BigDecimal> metricMap) throws IOException {
-    return buildJobMetrics(generateMockStreamingMetrics(metricMap));
-  }
-
-  private List<MetricUpdate> generateMockStreamingMetrics(Map<String, BigDecimal> metricMap) {
-    List<MetricUpdate> metrics = Lists.newArrayList();
-    for (Map.Entry<String, BigDecimal> entry : metricMap.entrySet()) {
-      MetricStructuredName name = new MetricStructuredName();
-      name.setName(entry.getKey());
-
-      MetricUpdate metric = new MetricUpdate();
-      metric.setName(name);
-      metric.setScalar(entry.getValue());
-      metrics.add(metric);
-    }
-    return metrics;
-  }
-
-  private JobMetrics buildJobMetrics(List<MetricUpdate> metricList) {
-    JobMetrics jobMetrics = new JobMetrics();
-    jobMetrics.setMetrics(metricList);
-    // N.B. Setting the factory is necessary in order to get valid JSON.
-    jobMetrics.setFactory(Transport.getJsonFactory());
-    return jobMetrics;
-  }
-
-  /**
-   * Tests that a tentative {@code true} from metrics indicates that every {@link PAssert} has
-   * succeeded.
-   */
-  @Test
-  public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(buildJobMetrics(generateMockMetrics(true /* success */, true /* tentative */)));
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    doReturn(State.DONE).when(job).getState();
-    assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(true)));
-  }
-
-  /**
-   * Tests that when we just see a tentative failure for a {@link PAssert} it is considered a
-   * conclusive failure.
-   */
-  @Test
-  public void testCheckingForSuccessWhenPAssertFails() throws Exception {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(
-            buildJobMetrics(generateMockMetrics(false /* success */, true /* tentative */)));
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    doReturn(State.DONE).when(job).getState();
-    assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(false)));
-  }
-
-  @Test
-  public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
-    DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(
-            buildJobMetrics(generateMockMetrics(true /* success */, false /* tentative */)));
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    runner.updatePAssertCount(p);
-    doReturn(State.RUNNING).when(job).getState();
-    assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.<Boolean>absent()));
-  }
-
-  /**
-   * Tests that if a streaming pipeline terminates with FAIL that the check for PAssert
-   * success is a conclusive failure.
-   */
-  @Test
-  public void testStreamingPipelineFailsIfServiceFails() throws Exception {
-    DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    doReturn(State.FAILED).when(job).getState();
-    assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(false)));
-  }
-
-  /**
-   * Tests that if a streaming pipeline crash loops for a non-assertion reason that the test run
-   * throws an {@link AssertionError}.
-   *
-   * <p>This is a known limitation/bug of the runner that it does not distinguish the two modes of
-   * failure.
-   */
-  @Test
-  public void testStreamingPipelineFailsIfException() throws Exception {
-    options.setStreaming(true);
-    Pipeline pipeline = TestPipeline.create(options);
-    PCollection<Integer> pc = pipeline.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.RUNNING);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
-        .thenAnswer(new Answer<State>() {
-          @Override
-          public State answer(InvocationOnMock invocation) {
-            JobMessage message = new JobMessage();
-            message.setMessageText("FooException");
-            message.setTime(TimeUtil.toCloudTime(Instant.now()));
-            message.setMessageImportance("JOB_MESSAGE_ERROR");
-            ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[1])
-                .process(Arrays.asList(message));
-            return State.CANCELLED;
-          }
-        });
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */));
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-
-    try {
-      runner.run(pipeline, mockRunner);
-    } catch (AssertionError exc) {
-      return;
-    }
-    fail("AssertionError expected");
-  }
-
-  @Test
-  public void testGetJobMetricsThatSucceeds() throws Exception {
-    DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
-    Pipeline p = TestPipeline.create(options);
-    p.apply(Create.of(1, 2, 3));
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    JobMetrics metrics = runner.getJobMetrics(job);
-
-    assertEquals(1, metrics.getMetrics().size());
-    assertEquals(generateMockMetrics(true /* success */, true /* tentative */),
-        metrics.getMetrics());
-  }
-
-  @Test
-  public void testGetJobMetricsThatFailsForException() throws Exception {
-    DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
-    Pipeline p = TestPipeline.create(options);
-    p.apply(Create.of(1, 2, 3));
-
-    when(mockClient.getJobMetrics(anyString())).thenThrow(new IOException());
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    assertNull(runner.getJobMetrics(job));
-  }
-
-  @Test
-  public void testBatchOnCreateMatcher() throws Exception {
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.DONE);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    options.as(TestPipelineOptions.class).setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
-    runner.run(p, mockRunner);
-  }
-
-  @Test
-  public void testStreamingOnCreateMatcher() throws Exception {
-    options.setStreaming(true);
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.DONE);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    options.as(TestPipelineOptions.class).setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
-
-    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
-        .thenReturn(State.DONE);
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */
-        ));
-    runner.run(p, mockRunner);
-  }
-
-  @Test
-  public void testBatchOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.DONE);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
-    runner.run(p, mockRunner);
-  }
-
-  /**
-   * Tests that when a streaming pipeline terminates and doesn't fail due to {@link PAssert} that
-   * the {@link TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) on success matcher} is
-   * invoked.
-   */
-  @Test
-  public void testStreamingOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
-    options.setStreaming(true);
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.DONE);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
-
-    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
-        .thenReturn(State.DONE);
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
-    runner.run(p, mockRunner);
-  }
-
-  @Test
-  public void testBatchOnSuccessMatcherWhenPipelineFails() throws Exception {
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.FAILED);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestFailureMatcher());
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */));
-    try {
-      runner.run(p, mockRunner);
-    } catch (AssertionError expected) {
-      verify(mockJob, Mockito.times(1)).waitUntilFinish(
-          any(Duration.class), any(JobMessagesHandler.class));
-      return;
-    }
-    fail("Expected an exception on pipeline failure.");
-  }
-
-  /**
-   * Tests that when a streaming pipeline terminates in FAIL that the {@link
-   * TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) on success matcher} is not
-   * invoked.
-   */
-  @Test
-  public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
-    options.setStreaming(true);
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.FAILED);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestFailureMatcher());
-
-    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
-        .thenReturn(State.FAILED);
-
-    runner.run(p, mockRunner);
-    // If the onSuccessMatcher were invoked, it would have crashed here.
-  }
-
-  static class TestSuccessMatcher extends BaseMatcher<PipelineResult> implements
-      SerializableMatcher<PipelineResult> {
-    private final DataflowPipelineJob mockJob;
-    private final int called;
-
-    public TestSuccessMatcher(DataflowPipelineJob job, int times) {
-      this.mockJob = job;
-      this.called = times;
-    }
-
-    @Override
-    public boolean matches(Object o) {
-      if (!(o instanceof PipelineResult)) {
-        fail(String.format("Expected PipelineResult but received %s", o));
-      }
-      try {
-        verify(mockJob, Mockito.times(called)).waitUntilFinish(
-            any(Duration.class), any(JobMessagesHandler.class));
-      } catch (IOException | InterruptedException e) {
-        throw new AssertionError(e);
-      }
-      assertSame(mockJob, o);
-      return true;
-    }
-
-    @Override
-    public void describeTo(Description description) {
-    }
-  }
-
-  static class TestFailureMatcher extends BaseMatcher<PipelineResult> implements
-      SerializableMatcher<PipelineResult> {
-    @Override
-    public boolean matches(Object o) {
-      fail("OnSuccessMatcher should not be called on pipeline failure.");
-      return false;
-    }
-
-    @Override
-    public void describeTo(Description description) {
-    }
-  }
-}


[3/4] beam git commit: Move TestDataflowRunner into dataflow package

Posted by ke...@apache.org.
Move TestDataflowRunner into dataflow package


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b46e7b9b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b46e7b9b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b46e7b9b

Branch: refs/heads/master
Commit: b46e7b9bb113a5b1b75c7aca9de8cbabdd75ff6f
Parents: eb043d0
Author: Kenneth Knowles <kl...@google.com>
Authored: Sat May 6 05:22:03 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat May 6 13:49:52 2017 -0700

----------------------------------------------------------------------
 .../dataflow/DataflowPipelineRegistrar.java     |   1 -
 .../dataflow/TestDataflowPipelineOptions.java   |  28 +
 .../runners/dataflow/TestDataflowRunner.java    | 322 +++++++++
 .../testing/TestDataflowPipelineOptions.java    |  28 -
 .../dataflow/testing/TestDataflowRunner.java    | 325 ---------
 .../runners/dataflow/testing/package-info.java  |  24 -
 .../runners/dataflow/DataflowMetricsTest.java   |   1 -
 .../dataflow/DataflowPipelineJobTest.java       |   1 -
 .../dataflow/TestDataflowRunnerTest.java        | 652 ++++++++++++++++++
 .../testing/TestDataflowRunnerTest.java         | 655 -------------------
 10 files changed, 1002 insertions(+), 1035 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
index f36930f..15855f9 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.dataflow;
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableList;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.testing.TestDataflowRunner;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsRegistrar;

http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowPipelineOptions.java
new file mode 100644
index 0000000..a8acc76
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowPipelineOptions.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+/**
+ * A set of options used to configure the {@link TestPipeline}.
+ */
+public interface TestDataflowPipelineOptions extends TestPipelineOptions, DataflowPipelineOptions {
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
new file mode 100644
index 0000000..b81b487
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link TestDataflowRunner} is a pipeline runner that wraps a
+ * {@link DataflowRunner} when running tests against the {@link TestPipeline}.
+ *
+ * @see TestPipeline
+ */
+public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
+  private static final String TENTATIVE_COUNTER = "tentative";
+  private static final Logger LOG = LoggerFactory.getLogger(TestDataflowRunner.class);
+
+  private final TestDataflowPipelineOptions options;
+  private final DataflowClient dataflowClient;
+  private final DataflowRunner runner;
+  private int expectedNumberOfAssertions = 0;
+
+  TestDataflowRunner(TestDataflowPipelineOptions options, DataflowClient client) {
+    this.options = options;
+    this.dataflowClient = client;
+    this.runner = DataflowRunner.fromOptions(options);
+  }
+
+  /**
+   * Constructs a runner from the provided options.
+   */
+  public static TestDataflowRunner fromOptions(PipelineOptions options) {
+    TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class);
+    String tempLocation = Joiner.on("/").join(
+        dataflowOptions.getTempRoot(),
+        dataflowOptions.getJobName(),
+        "output",
+        "results");
+    dataflowOptions.setTempLocation(tempLocation);
+
+    return new TestDataflowRunner(
+        dataflowOptions, DataflowClient.create(options.as(DataflowPipelineOptions.class)));
+  }
+
+  @VisibleForTesting
+  static TestDataflowRunner fromOptionsAndClient(
+      TestDataflowPipelineOptions options, DataflowClient client) {
+    return new TestDataflowRunner(options, client);
+  }
+
+  @Override
+  public DataflowPipelineJob run(Pipeline pipeline) {
+    return run(pipeline, runner);
+  }
+
+  DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) {
+    updatePAssertCount(pipeline);
+
+    TestPipelineOptions testPipelineOptions = options.as(TestPipelineOptions.class);
+    final DataflowPipelineJob job;
+    job = runner.run(pipeline);
+
+    LOG.info("Running Dataflow job {} with {} expected assertions.",
+        job.getJobId(), expectedNumberOfAssertions);
+
+    assertThat(job, testPipelineOptions.getOnCreateMatcher());
+
+    final ErrorMonitorMessagesHandler messageHandler =
+        new ErrorMonitorMessagesHandler(job, new MonitoringUtil.LoggingHandler());
+
+    try {
+      Optional<Boolean> result = Optional.absent();
+
+      if (options.isStreaming()) {
+        // In streaming, there are infinite retries, so rather than timeout
+        // we try to terminate early by polling and canceling if we see
+        // an error message
+        while (true) {
+          State state = job.waitUntilFinish(Duration.standardSeconds(3), messageHandler);
+          if (state != null && state.isTerminal()) {
+            break;
+          }
+
+          if (messageHandler.hasSeenError()) {
+            if (!job.getState().isTerminal()) {
+              LOG.info("Cancelling Dataflow job {}", job.getJobId());
+              job.cancel();
+            }
+            break;
+          }
+        }
+
+        // Whether we canceled or not, this gets the final state of the job or times out
+        State finalState =
+            job.waitUntilFinish(
+                Duration.standardSeconds(options.getTestTimeoutSeconds()), messageHandler);
+
+        // Getting the final state timed out; it may not indicate a failure.
+        // This cancellation may be the second
+        if (finalState == null || finalState == State.RUNNING) {
+          LOG.info(
+              "Dataflow job {} took longer than {} seconds to complete, cancelling.",
+              job.getJobId(),
+              options.getTestTimeoutSeconds());
+          job.cancel();
+        }
+
+        if (messageHandler.hasSeenError()) {
+          result = Optional.of(false);
+        }
+      } else {
+        job.waitUntilFinish(Duration.standardSeconds(-1), messageHandler);
+        result = checkForPAssertSuccess(job);
+      }
+
+      if (!result.isPresent()) {
+        if (options.isStreaming()) {
+          LOG.warn(
+              "Dataflow job {} did not output a success or failure metric."
+                  + " In rare situations, some PAsserts may not have run."
+                  + " This is a known limitation of Dataflow in streaming.",
+              job.getJobId());
+        } else {
+          throw new IllegalStateException(
+              String.format(
+                  "Dataflow job %s did not output a success or failure metric.", job.getJobId()));
+        }
+      } else if (!result.get()) {
+        throw new AssertionError(
+            Strings.isNullOrEmpty(messageHandler.getErrorMessage())
+                ? String.format(
+                    "Dataflow job %s terminated in state %s but did not return a failure reason.",
+                    job.getJobId(), job.getState())
+                : messageHandler.getErrorMessage());
+      } else {
+        assertThat(job, testPipelineOptions.getOnSuccessMatcher());
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return job;
+  }
+
+  @VisibleForTesting
+  void updatePAssertCount(Pipeline pipeline) {
+    if (DataflowRunner.hasExperiment(options, "beam_fn_api")) {
+      // TODO[BEAM-1866]: FnAPI does not support metrics, so expect 0 assertions.
+      expectedNumberOfAssertions = 0;
+    } else {
+      expectedNumberOfAssertions = PAssert.countAsserts(pipeline);
+    }
+  }
+
+  /**
+   * Check that PAssert expectations were met.
+   *
+   * <p>If the pipeline is not in a failed/cancelled state and no PAsserts were used within the
+   * pipeline, then this method will state that all PAsserts succeeded.
+   *
+   * @return Optional.of(false) if we are certain a PAssert or some other critical thing has failed,
+   *     Optional.of(true) if we are certain all PAsserts passed, and Optional.absent() if the
+   *     evidence is inconclusive.
+   */
+  @VisibleForTesting
+  Optional<Boolean> checkForPAssertSuccess(DataflowPipelineJob job) throws IOException {
+
+    // If the job failed, this is a definite failure. We only cancel jobs when they fail.
+    State state = job.getState();
+    if (state == State.FAILED || state == State.CANCELLED) {
+      LOG.info("Dataflow job {} terminated in failure state {}", job.getJobId(), state);
+      return Optional.of(false);
+    }
+
+    JobMetrics metrics = getJobMetrics(job);
+    if (metrics == null || metrics.getMetrics() == null) {
+      LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
+      return Optional.absent();
+    }
+
+    int successes = 0;
+    int failures = 0;
+    for (MetricUpdate metric : metrics.getMetrics()) {
+      if (metric.getName() == null
+          || metric.getName().getContext() == null
+          || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
+        // Don't double count using the non-tentative version of the metric.
+        continue;
+      }
+      if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
+        successes += ((BigDecimal) metric.getScalar()).intValue();
+      } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) {
+        failures += ((BigDecimal) metric.getScalar()).intValue();
+      }
+    }
+
+    if (failures > 0) {
+      LOG.info("Failure result for Dataflow job {}. Found {} success, {} failures out of "
+          + "{} expected assertions.", job.getJobId(), successes, failures,
+          expectedNumberOfAssertions);
+      return Optional.of(false);
+    } else if (successes >= expectedNumberOfAssertions) {
+      LOG.info(
+          "Success result for Dataflow job {}."
+              + " Found {} success, {} failures out of {} expected assertions.",
+          job.getJobId(),
+          successes,
+          failures,
+          expectedNumberOfAssertions);
+      return Optional.of(true);
+    }
+
+    LOG.info(
+        "Inconclusive results for Dataflow job {}."
+            + " Found {} success, {} failures out of {} expected assertions.",
+        job.getJobId(),
+        successes,
+        failures,
+        expectedNumberOfAssertions);
+    return Optional.absent();
+  }
+
+  @Nullable
+  @VisibleForTesting
+  JobMetrics getJobMetrics(DataflowPipelineJob job) {
+    JobMetrics metrics = null;
+    try {
+      metrics = dataflowClient.getJobMetrics(job.getJobId());
+    } catch (IOException e) {
+      LOG.warn("Failed to get job metrics: ", e);
+    }
+    return metrics;
+  }
+
+  @Override
+  public String toString() {
+    return "TestDataflowRunner#" + options.getAppName();
+  }
+
+  /**
+   * Monitors job log output messages for errors.
+   *
+   * <p>Creates an error message representing the concatenation of all error messages seen.
+   */
+  private static class ErrorMonitorMessagesHandler implements JobMessagesHandler {
+    private final DataflowPipelineJob job;
+    private final JobMessagesHandler messageHandler;
+    private final StringBuffer errorMessage;
+    private volatile boolean hasSeenError;
+
+    private ErrorMonitorMessagesHandler(
+        DataflowPipelineJob job, JobMessagesHandler messageHandler) {
+      this.job = job;
+      this.messageHandler = messageHandler;
+      this.errorMessage = new StringBuffer();
+      this.hasSeenError = false;
+    }
+
+    @Override
+    public void process(List<JobMessage> messages) {
+      messageHandler.process(messages);
+      for (JobMessage message : messages) {
+        if (message.getMessageImportance() != null
+            && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
+          LOG.info("Dataflow job {} threw exception. Failure message was: {}",
+              job.getJobId(), message.getMessageText());
+          errorMessage.append(message.getMessageText());
+          hasSeenError = true;
+        }
+      }
+    }
+
+    boolean hasSeenError() {
+      return hasSeenError;
+    }
+
+    String getErrorMessage() {
+      return errorMessage.toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java
deleted file mode 100644
index 12f7b39..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.testing;
-
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-
-/**
- * A set of options used to configure the {@link TestPipeline}.
- */
-public interface TestDataflowPipelineOptions extends TestPipelineOptions, DataflowPipelineOptions {
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
deleted file mode 100644
index ce91915..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.testing;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Strings;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.dataflow.DataflowClient;
-import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.PipelineRunner;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link TestDataflowRunner} is a pipeline runner that wraps a
- * {@link DataflowRunner} when running tests against the {@link TestPipeline}.
- *
- * @see TestPipeline
- */
-public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
-  private static final String TENTATIVE_COUNTER = "tentative";
-  private static final Logger LOG = LoggerFactory.getLogger(TestDataflowRunner.class);
-
-  private final TestDataflowPipelineOptions options;
-  private final DataflowClient dataflowClient;
-  private final DataflowRunner runner;
-  private int expectedNumberOfAssertions = 0;
-
-  TestDataflowRunner(TestDataflowPipelineOptions options, DataflowClient client) {
-    this.options = options;
-    this.dataflowClient = client;
-    this.runner = DataflowRunner.fromOptions(options);
-  }
-
-  /**
-   * Constructs a runner from the provided options.
-   */
-  public static TestDataflowRunner fromOptions(PipelineOptions options) {
-    TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class);
-    String tempLocation = Joiner.on("/").join(
-        dataflowOptions.getTempRoot(),
-        dataflowOptions.getJobName(),
-        "output",
-        "results");
-    dataflowOptions.setTempLocation(tempLocation);
-
-    return new TestDataflowRunner(
-        dataflowOptions, DataflowClient.create(options.as(DataflowPipelineOptions.class)));
-  }
-
-  @VisibleForTesting
-  static TestDataflowRunner fromOptionsAndClient(
-      TestDataflowPipelineOptions options, DataflowClient client) {
-    return new TestDataflowRunner(options, client);
-  }
-
-  @Override
-  public DataflowPipelineJob run(Pipeline pipeline) {
-    return run(pipeline, runner);
-  }
-
-  DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) {
-    updatePAssertCount(pipeline);
-
-    TestPipelineOptions testPipelineOptions = options.as(TestPipelineOptions.class);
-    final DataflowPipelineJob job;
-    job = runner.run(pipeline);
-
-    LOG.info("Running Dataflow job {} with {} expected assertions.",
-        job.getJobId(), expectedNumberOfAssertions);
-
-    assertThat(job, testPipelineOptions.getOnCreateMatcher());
-
-    final ErrorMonitorMessagesHandler messageHandler =
-        new ErrorMonitorMessagesHandler(job, new MonitoringUtil.LoggingHandler());
-
-    try {
-      Optional<Boolean> result = Optional.absent();
-
-      if (options.isStreaming()) {
-        // In streaming, there are infinite retries, so rather than timeout
-        // we try to terminate early by polling and canceling if we see
-        // an error message
-        while (true) {
-          State state = job.waitUntilFinish(Duration.standardSeconds(3), messageHandler);
-          if (state != null && state.isTerminal()) {
-            break;
-          }
-
-          if (messageHandler.hasSeenError()) {
-            if (!job.getState().isTerminal()) {
-              LOG.info("Cancelling Dataflow job {}", job.getJobId());
-              job.cancel();
-            }
-            break;
-          }
-        }
-
-        // Whether we canceled or not, this gets the final state of the job or times out
-        State finalState =
-            job.waitUntilFinish(
-                Duration.standardSeconds(options.getTestTimeoutSeconds()), messageHandler);
-
-        // Getting the final state timed out; it may not indicate a failure.
-        // This cancellation may be the second
-        if (finalState == null || finalState == State.RUNNING) {
-          LOG.info(
-              "Dataflow job {} took longer than {} seconds to complete, cancelling.",
-              job.getJobId(),
-              options.getTestTimeoutSeconds());
-          job.cancel();
-        }
-
-        if (messageHandler.hasSeenError()) {
-          result = Optional.of(false);
-        }
-      } else {
-        job.waitUntilFinish(Duration.standardSeconds(-1), messageHandler);
-        result = checkForPAssertSuccess(job);
-      }
-
-      if (!result.isPresent()) {
-        if (options.isStreaming()) {
-          LOG.warn(
-              "Dataflow job {} did not output a success or failure metric."
-                  + " In rare situations, some PAsserts may not have run."
-                  + " This is a known limitation of Dataflow in streaming.",
-              job.getJobId());
-        } else {
-          throw new IllegalStateException(
-              String.format(
-                  "Dataflow job %s did not output a success or failure metric.", job.getJobId()));
-        }
-      } else if (!result.get()) {
-        throw new AssertionError(
-            Strings.isNullOrEmpty(messageHandler.getErrorMessage())
-                ? String.format(
-                    "Dataflow job %s terminated in state %s but did not return a failure reason.",
-                    job.getJobId(), job.getState())
-                : messageHandler.getErrorMessage());
-      } else {
-        assertThat(job, testPipelineOptions.getOnSuccessMatcher());
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException(e);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return job;
-  }
-
-  @VisibleForTesting
-  void updatePAssertCount(Pipeline pipeline) {
-    if (DataflowRunner.hasExperiment(options, "beam_fn_api")) {
-      // TODO[BEAM-1866]: FnAPI does not support metrics, so expect 0 assertions.
-      expectedNumberOfAssertions = 0;
-    } else {
-      expectedNumberOfAssertions = PAssert.countAsserts(pipeline);
-    }
-  }
-
-  /**
-   * Check that PAssert expectations were met.
-   *
-   * <p>If the pipeline is not in a failed/cancelled state and no PAsserts were used within the
-   * pipeline, then this method will state that all PAsserts succeeded.
-   *
-   * @return Optional.of(false) if we are certain a PAssert or some other critical thing has failed,
-   *     Optional.of(true) if we are certain all PAsserts passed, and Optional.absent() if the
-   *     evidence is inconclusive.
-   */
-  @VisibleForTesting
-  Optional<Boolean> checkForPAssertSuccess(DataflowPipelineJob job) throws IOException {
-
-    // If the job failed, this is a definite failure. We only cancel jobs when they fail.
-    State state = job.getState();
-    if (state == State.FAILED || state == State.CANCELLED) {
-      LOG.info("Dataflow job {} terminated in failure state {}", job.getJobId(), state);
-      return Optional.of(false);
-    }
-
-    JobMetrics metrics = getJobMetrics(job);
-    if (metrics == null || metrics.getMetrics() == null) {
-      LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
-      return Optional.absent();
-    }
-
-    int successes = 0;
-    int failures = 0;
-    for (MetricUpdate metric : metrics.getMetrics()) {
-      if (metric.getName() == null
-          || metric.getName().getContext() == null
-          || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
-        // Don't double count using the non-tentative version of the metric.
-        continue;
-      }
-      if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
-        successes += ((BigDecimal) metric.getScalar()).intValue();
-      } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) {
-        failures += ((BigDecimal) metric.getScalar()).intValue();
-      }
-    }
-
-    if (failures > 0) {
-      LOG.info("Failure result for Dataflow job {}. Found {} success, {} failures out of "
-          + "{} expected assertions.", job.getJobId(), successes, failures,
-          expectedNumberOfAssertions);
-      return Optional.of(false);
-    } else if (successes >= expectedNumberOfAssertions) {
-      LOG.info(
-          "Success result for Dataflow job {}."
-              + " Found {} success, {} failures out of {} expected assertions.",
-          job.getJobId(),
-          successes,
-          failures,
-          expectedNumberOfAssertions);
-      return Optional.of(true);
-    }
-
-    LOG.info(
-        "Inconclusive results for Dataflow job {}."
-            + " Found {} success, {} failures out of {} expected assertions.",
-        job.getJobId(),
-        successes,
-        failures,
-        expectedNumberOfAssertions);
-    return Optional.absent();
-  }
-
-  @Nullable
-  @VisibleForTesting
-  JobMetrics getJobMetrics(DataflowPipelineJob job) {
-    JobMetrics metrics = null;
-    try {
-      metrics = dataflowClient.getJobMetrics(job.getJobId());
-    } catch (IOException e) {
-      LOG.warn("Failed to get job metrics: ", e);
-    }
-    return metrics;
-  }
-
-  @Override
-  public String toString() {
-    return "TestDataflowRunner#" + options.getAppName();
-  }
-
-  /**
-   * Monitors job log output messages for errors.
-   *
-   * <p>Creates an error message representing the concatenation of all error messages seen.
-   */
-  private static class ErrorMonitorMessagesHandler implements JobMessagesHandler {
-    private final DataflowPipelineJob job;
-    private final JobMessagesHandler messageHandler;
-    private final StringBuffer errorMessage;
-    private volatile boolean hasSeenError;
-
-    private ErrorMonitorMessagesHandler(
-        DataflowPipelineJob job, JobMessagesHandler messageHandler) {
-      this.job = job;
-      this.messageHandler = messageHandler;
-      this.errorMessage = new StringBuffer();
-      this.hasSeenError = false;
-    }
-
-    @Override
-    public void process(List<JobMessage> messages) {
-      messageHandler.process(messages);
-      for (JobMessage message : messages) {
-        if (message.getMessageImportance() != null
-            && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
-          LOG.info("Dataflow job {} threw exception. Failure message was: {}",
-              job.getJobId(), message.getMessageText());
-          errorMessage.append(message.getMessageText());
-          hasSeenError = true;
-        }
-      }
-    }
-
-    boolean hasSeenError() {
-      return hasSeenError;
-    }
-
-    String getErrorMessage() {
-      return errorMessage.toString();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/package-info.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/package-info.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/package-info.java
deleted file mode 100644
index 9683df0..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Provides utilities for integration testing and {@link
- * org.apache.beam.sdk.testing.ValidatesRunner} tests of the Google Cloud Dataflow
- * runner.
- */
-package org.apache.beam.runners.dataflow.testing;

http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
index aabdd84..7e88300 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
@@ -37,7 +37,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.math.BigDecimal;
-import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
 import org.apache.beam.sdk.metrics.MetricQueryResults;

http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index f868a17..df894d2 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -48,7 +48,6 @@ import java.util.List;
 import java.util.NavigableMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
 import org.apache.beam.runners.dataflow.util.TimeUtil;
 import org.apache.beam.sdk.Pipeline;

http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
new file mode 100644
index 0000000..bf15747
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
@@ -0,0 +1,652 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricStructuredName;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
+import org.apache.beam.runners.dataflow.util.TimeUtil;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SerializableMatcher;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.NoopPathValidator;
+import org.apache.beam.sdk.util.Transport;
+import org.apache.beam.sdk.values.PCollection;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/** Tests for {@link TestDataflowRunner}. */
+@RunWith(JUnit4.class)
+public class TestDataflowRunnerTest {
+  @Rule public ExpectedException expectedException = ExpectedException.none();
+  @Mock private DataflowClient mockClient;
+
+  private TestDataflowPipelineOptions options;
+
+  @Before
+  public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+
+    options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
+    options.setAppName("TestAppName");
+    options.setProject("test-project");
+    options.setTempLocation("gs://test/temp/location");
+    options.setTempRoot("gs://test");
+    options.setGcpCredential(new TestCredential());
+    options.setRunner(TestDataflowRunner.class);
+    options.setPathValidatorClass(NoopPathValidator.class);
+  }
+
+  @Test
+  public void testToString() {
+    assertEquals("TestDataflowRunner#TestAppName",
+        TestDataflowRunner.fromOptions(options).toString());
+  }
+
+  @Test
+  public void testRunBatchJobThatSucceeds() throws Exception {
+    Pipeline p = Pipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
+    assertEquals(mockJob, runner.run(p, mockRunner));
+  }
+
+  @Test
+  public void testRunBatchJobThatFails() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.FAILED);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(false /* success */, false /* tentative */));
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      return;
+    }
+    // Note that fail throws an AssertionError which is why it is placed out here
+    // instead of inside the try-catch block.
+    fail("AssertionError expected");
+  }
+
+  @Test
+  public void testBatchPipelineFailsIfException() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.RUNNING);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
+        .thenAnswer(new Answer<State>() {
+          @Override
+          public State answer(InvocationOnMock invocation) {
+            JobMessage message = new JobMessage();
+            message.setMessageText("FooException");
+            message.setTime(TimeUtil.toCloudTime(Instant.now()));
+            message.setMessageImportance("JOB_MESSAGE_ERROR");
+            ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[1])
+                .process(Arrays.asList(message));
+            return State.CANCELLED;
+          }
+        });
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */));
+    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      assertThat(expected.getMessage(), containsString("FooException"));
+      verify(mockJob, never()).cancel();
+      return;
+    }
+    // Note that fail throws an AssertionError which is why it is placed out here
+    // instead of inside the try-catch block.
+    fail("AssertionError expected");
+  }
+
+  /**
+   * A streaming job that terminates with no error messages is a success.
+   */
+  @Test
+  public void testRunStreamingJobUsingPAssertThatSucceeds() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
+        .thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
+    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    runner.run(p, mockRunner);
+  }
+
+  @Test
+  public void testRunStreamingJobNotUsingPAssertThatSucceeds() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    p.apply(Create.of(1, 2, 3));
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
+        .thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockStreamingMetricResponse(ImmutableMap.<String, BigDecimal>of()));
+    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    runner.run(p, mockRunner);
+  }
+
+  /**
+   * Tests that a streaming job with a false {@link PAssert} fails.
+   *
+   * <p>Currently, this failure is indistinguishable from a non-{@link PAssert} failure, because it
+   * is detected only by failure job messages. With fuller metric support, this can detect a PAssert
+   * failure via metrics and raise an {@link AssertionError} in just that case.
+   */
+  @Test
+  public void testRunStreamingJobThatFails() throws Exception {
+    testStreamingPipelineFailsIfException();
+  }
+
+  private JobMetrics generateMockMetricResponse(boolean success, boolean tentative)
+      throws Exception {
+    List<MetricUpdate> metrics = generateMockMetrics(success, tentative);
+    return buildJobMetrics(metrics);
+  }
+
+  private List<MetricUpdate> generateMockMetrics(boolean success, boolean tentative) {
+    MetricStructuredName name = new MetricStructuredName();
+    name.setName(success ? "PAssertSuccess" : "PAssertFailure");
+    name.setContext(
+        tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.<String, String>of());
+
+    MetricUpdate metric = new MetricUpdate();
+    metric.setName(name);
+    metric.setScalar(BigDecimal.ONE);
+    return Lists.newArrayList(metric);
+  }
+
+  private JobMetrics generateMockStreamingMetricResponse(Map<String,
+      BigDecimal> metricMap) throws IOException {
+    return buildJobMetrics(generateMockStreamingMetrics(metricMap));
+  }
+
+  private List<MetricUpdate> generateMockStreamingMetrics(Map<String, BigDecimal> metricMap) {
+    List<MetricUpdate> metrics = Lists.newArrayList();
+    for (Map.Entry<String, BigDecimal> entry : metricMap.entrySet()) {
+      MetricStructuredName name = new MetricStructuredName();
+      name.setName(entry.getKey());
+
+      MetricUpdate metric = new MetricUpdate();
+      metric.setName(name);
+      metric.setScalar(entry.getValue());
+      metrics.add(metric);
+    }
+    return metrics;
+  }
+
+  private JobMetrics buildJobMetrics(List<MetricUpdate> metricList) {
+    JobMetrics jobMetrics = new JobMetrics();
+    jobMetrics.setMetrics(metricList);
+    // N.B. Setting the factory is necessary in order to get valid JSON.
+    jobMetrics.setFactory(Transport.getJsonFactory());
+    return jobMetrics;
+  }
+
+  /**
+   * Tests that a tentative {@code true} from metrics indicates that every {@link PAssert} has
+   * succeeded.
+   */
+  @Test
+  public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(buildJobMetrics(generateMockMetrics(true /* success */, true /* tentative */)));
+
+    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    doReturn(State.DONE).when(job).getState();
+    assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(true)));
+  }
+
+  /**
+   * Tests that when we just see a tentative failure for a {@link PAssert} it is considered a
+   * conclusive failure.
+   */
+  @Test
+  public void testCheckingForSuccessWhenPAssertFails() throws Exception {
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(
+            buildJobMetrics(generateMockMetrics(false /* success */, true /* tentative */)));
+
+    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    doReturn(State.DONE).when(job).getState();
+    assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(false)));
+  }
+
+  @Test
+  public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
+    DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(
+            buildJobMetrics(generateMockMetrics(true /* success */, false /* tentative */)));
+
+    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    runner.updatePAssertCount(p);
+    doReturn(State.RUNNING).when(job).getState();
+    assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.<Boolean>absent()));
+  }
+
+  /**
+   * Tests that if a streaming pipeline terminates with FAIL that the check for PAssert
+   * success is a conclusive failure.
+   */
+  @Test
+  public void testStreamingPipelineFailsIfServiceFails() throws Exception {
+    DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    doReturn(State.FAILED).when(job).getState();
+    assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(false)));
+  }
+
+  /**
+   * Tests that if a streaming pipeline crash loops for a non-assertion reason that the test run
+   * throws an {@link AssertionError}.
+   *
+   * <p>This is a known limitation/bug of the runner that it does not distinguish the two modes of
+   * failure.
+   */
+  @Test
+  public void testStreamingPipelineFailsIfException() throws Exception {
+    options.setStreaming(true);
+    Pipeline pipeline = TestPipeline.create(options);
+    PCollection<Integer> pc = pipeline.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.RUNNING);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
+        .thenAnswer(new Answer<State>() {
+          @Override
+          public State answer(InvocationOnMock invocation) {
+            JobMessage message = new JobMessage();
+            message.setMessageText("FooException");
+            message.setTime(TimeUtil.toCloudTime(Instant.now()));
+            message.setMessageImportance("JOB_MESSAGE_ERROR");
+            ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[1])
+                .process(Arrays.asList(message));
+            return State.CANCELLED;
+          }
+        });
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */));
+    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+
+    try {
+      runner.run(pipeline, mockRunner);
+    } catch (AssertionError exc) {
+      return;
+    }
+    fail("AssertionError expected");
+  }
+
+  @Test
+  public void testGetJobMetricsThatSucceeds() throws Exception {
+    DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
+    Pipeline p = TestPipeline.create(options);
+    p.apply(Create.of(1, 2, 3));
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
+    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    JobMetrics metrics = runner.getJobMetrics(job);
+
+    assertEquals(1, metrics.getMetrics().size());
+    assertEquals(generateMockMetrics(true /* success */, true /* tentative */),
+        metrics.getMetrics());
+  }
+
+  @Test
+  public void testGetJobMetricsThatFailsForException() throws Exception {
+    DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
+    Pipeline p = TestPipeline.create(options);
+    p.apply(Create.of(1, 2, 3));
+
+    when(mockClient.getJobMetrics(anyString())).thenThrow(new IOException());
+    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    assertNull(runner.getJobMetrics(job));
+  }
+
+  @Test
+  public void testBatchOnCreateMatcher() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    options.as(TestPipelineOptions.class).setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
+    runner.run(p, mockRunner);
+  }
+
+  @Test
+  public void testStreamingOnCreateMatcher() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    options.as(TestPipelineOptions.class).setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
+
+    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
+        .thenReturn(State.DONE);
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */
+        ));
+    runner.run(p, mockRunner);
+  }
+
+  @Test
+  public void testBatchOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
+    runner.run(p, mockRunner);
+  }
+
+  /**
+   * Tests that when a streaming pipeline terminates and doesn't fail due to {@link PAssert} that
+   * the {@link TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) on success matcher} is
+   * invoked.
+   */
+  @Test
+  public void testStreamingOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
+
+    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
+        .thenReturn(State.DONE);
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
+    runner.run(p, mockRunner);
+  }
+
+  @Test
+  public void testBatchOnSuccessMatcherWhenPipelineFails() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.FAILED);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestFailureMatcher());
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */));
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      verify(mockJob, Mockito.times(1)).waitUntilFinish(
+          any(Duration.class), any(JobMessagesHandler.class));
+      return;
+    }
+    fail("Expected an exception on pipeline failure.");
+  }
+
+  /**
+   * Tests that when a streaming pipeline terminates in FAIL that the {@link
+   * TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) on success matcher} is not
+   * invoked.
+   */
+  @Test
+  public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.FAILED);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestFailureMatcher());
+
+    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
+        .thenReturn(State.FAILED);
+
+    runner.run(p, mockRunner);
+    // If the onSuccessMatcher were invoked, it would have crashed here.
+  }
+
+  static class TestSuccessMatcher extends BaseMatcher<PipelineResult> implements
+      SerializableMatcher<PipelineResult> {
+    private final DataflowPipelineJob mockJob;
+    private final int called;
+
+    public TestSuccessMatcher(DataflowPipelineJob job, int times) {
+      this.mockJob = job;
+      this.called = times;
+    }
+
+    @Override
+    public boolean matches(Object o) {
+      if (!(o instanceof PipelineResult)) {
+        fail(String.format("Expected PipelineResult but received %s", o));
+      }
+      try {
+        verify(mockJob, Mockito.times(called)).waitUntilFinish(
+            any(Duration.class), any(JobMessagesHandler.class));
+      } catch (IOException | InterruptedException e) {
+        throw new AssertionError(e);
+      }
+      assertSame(mockJob, o);
+      return true;
+    }
+
+    @Override
+    public void describeTo(Description description) {
+    }
+  }
+
+  static class TestFailureMatcher extends BaseMatcher<PipelineResult> implements
+      SerializableMatcher<PipelineResult> {
+    @Override
+    public boolean matches(Object o) {
+      fail("OnSuccessMatcher should not be called on pipeline failure.");
+      return false;
+    }
+
+    @Override
+    public void describeTo(Description description) {
+    }
+  }
+}


[4/4] beam git commit: This closes #2940: Move TestDataflowRunner into dataflow package

Posted by ke...@apache.org.
This closes #2940: Move TestDataflowRunner into dataflow package

  Move TestDataflowRunner into dataflow package
  Register the TestDataflowRunner via registrar


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/53dd0a52
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/53dd0a52
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/53dd0a52

Branch: refs/heads/master
Commit: 53dd0a529a3570d4a6edef4f3e59938403817df4
Parents: dabad1a b46e7b9
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun May 7 16:11:41 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun May 7 16:11:41 2017 -0700

----------------------------------------------------------------------
 ...job_beam_PostCommit_Java_MavenInstall.groovy |   2 +-
 ...tCommit_Java_ValidatesRunner_Dataflow.groovy |   2 +-
 .../job_beam_Release_NightlySnapshot.groovy     |   2 +-
 examples/java/pom.xml                           |   4 +-
 .../dataflow/DataflowPipelineRegistrar.java     |   2 +-
 .../dataflow/TestDataflowPipelineOptions.java   |  28 +
 .../runners/dataflow/TestDataflowRunner.java    | 322 +++++++++
 .../testing/TestDataflowPipelineOptions.java    |  28 -
 .../dataflow/testing/TestDataflowRunner.java    | 325 ---------
 .../runners/dataflow/testing/package-info.java  |  24 -
 .../runners/dataflow/DataflowMetricsTest.java   |   1 -
 .../dataflow/DataflowPipelineJobTest.java       |   1 -
 .../dataflow/DataflowPipelineRegistrarTest.java |   2 +-
 .../dataflow/TestDataflowRunnerTest.java        | 652 ++++++++++++++++++
 .../testing/TestDataflowRunnerTest.java         | 655 -------------------
 .../apache/beam/sdk/testing/TestPipeline.java   |   2 +-
 16 files changed, 1010 insertions(+), 1042 deletions(-)
----------------------------------------------------------------------