You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/20 22:16:30 UTC

[37/50] [abbrv] incubator-beam git commit: Rename DataflowPipelineRunner to DataflowRunner

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java
deleted file mode 100644
index fbaf116..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java
+++ /dev/null
@@ -1,601 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.testing;
-
-import static org.hamcrest.Matchers.containsString;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.doCallRealMethod;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
-import org.apache.beam.runners.dataflow.util.TimeUtil;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.SerializableMatcher;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.NoopPathValidator;
-import org.apache.beam.sdk.util.TestCredential;
-import org.apache.beam.sdk.util.Transport;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.api.client.http.LowLevelHttpResponse;
-import com.google.api.client.json.Json;
-import com.google.api.client.testing.http.MockHttpTransport;
-import com.google.api.client.testing.http.MockLowLevelHttpRequest;
-import com.google.api.client.testing.http.MockLowLevelHttpResponse;
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricStructuredName;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-/** Tests for {@link TestDataflowPipelineRunner}. */
-@RunWith(JUnit4.class)
-public class TestDataflowPipelineRunnerTest {
-  @Rule public ExpectedException expectedException = ExpectedException.none();
-  @Mock private MockHttpTransport transport;
-  @Mock private MockLowLevelHttpRequest request;
-  @Mock private GcsUtil mockGcsUtil;
-
-  private TestDataflowPipelineOptions options;
-  private Dataflow service;
-
-  @Before
-  public void setUp() throws Exception {
-    MockitoAnnotations.initMocks(this);
-    when(transport.buildRequest(anyString(), anyString())).thenReturn(request);
-    doCallRealMethod().when(request).getContentAsString();
-    service = new Dataflow(transport, Transport.getJsonFactory(), null);
-
-    options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
-    options.setAppName("TestAppName");
-    options.setProject("test-project");
-    options.setTempLocation("gs://test/temp/location");
-    options.setTempRoot("gs://test");
-    options.setGcpCredential(new TestCredential());
-    options.setDataflowClient(service);
-    options.setRunner(TestDataflowPipelineRunner.class);
-    options.setPathValidatorClass(NoopPathValidator.class);
-  }
-
-  @Test
-  public void testToString() {
-    assertEquals("TestDataflowPipelineRunner#TestAppName",
-        new TestDataflowPipelineRunner(options).toString());
-  }
-
-  @Test
-  public void testRunBatchJobThatSucceeds() throws Exception {
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getDataflowClient()).thenReturn(service);
-    when(mockJob.getState()).thenReturn(State.DONE);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(true /* success */, true /* tentative */));
-    assertEquals(mockJob, runner.run(p, mockRunner));
-  }
-
-  @Test
-  public void testRunBatchJobThatFails() throws Exception {
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getDataflowClient()).thenReturn(service);
-    when(mockJob.getState()).thenReturn(State.FAILED);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    try {
-      runner.run(p, mockRunner);
-    } catch (AssertionError expected) {
-      return;
-    }
-    // Note that fail throws an AssertionError which is why it is placed out here
-    // instead of inside the try-catch block.
-    fail("AssertionError expected");
-  }
-
-  @Test
-  public void testBatchPipelineFailsIfException() throws Exception {
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getDataflowClient()).thenReturn(service);
-    when(mockJob.getState()).thenReturn(State.RUNNING);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
-        .thenAnswer(new Answer<State>() {
-          @Override
-          public State answer(InvocationOnMock invocation) {
-            JobMessage message = new JobMessage();
-            message.setMessageText("FooException");
-            message.setTime(TimeUtil.toCloudTime(Instant.now()));
-            message.setMessageImportance("JOB_MESSAGE_ERROR");
-            ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
-                .process(Arrays.asList(message));
-            return State.CANCELLED;
-          }
-        });
-
-    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(false /* success */, true /* tentative */));
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    try {
-      runner.run(p, mockRunner);
-    } catch (AssertionError expected) {
-      assertThat(expected.getMessage(), containsString("FooException"));
-      verify(mockJob, atLeastOnce()).cancel();
-      return;
-    }
-    // Note that fail throws an AssertionError which is why it is placed out here
-    // instead of inside the try-catch block.
-    fail("AssertionError expected");
-  }
-
-  @Test
-  public void testRunStreamingJobThatSucceeds() throws Exception {
-    options.setStreaming(true);
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getDataflowClient()).thenReturn(service);
-    when(mockJob.getState()).thenReturn(State.RUNNING);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(true /* success */, true /* tentative */));
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    runner.run(p, mockRunner);
-  }
-
-  @Test
-  public void testRunStreamingJobThatFails() throws Exception {
-    options.setStreaming(true);
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getDataflowClient()).thenReturn(service);
-    when(mockJob.getState()).thenReturn(State.RUNNING);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(false /* success */, true /* tentative */));
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    try {
-      runner.run(p, mockRunner);
-    } catch (AssertionError expected) {
-      return;
-    }
-    // Note that fail throws an AssertionError which is why it is placed out here
-    // instead of inside the try-catch block.
-    fail("AssertionError expected");
-  }
-
-  @Test
-  public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", service, null));
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(true /* success */, true /* tentative */));
-    doReturn(State.DONE).when(job).getState();
-    assertEquals(Optional.of(true), runner.checkForSuccess(job));
-  }
-
-  @Test
-  public void testCheckingForSuccessWhenPAssertFails() throws Exception {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", service, null));
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(false /* success */, true /* tentative */));
-    doReturn(State.DONE).when(job).getState();
-    assertEquals(Optional.of(false), runner.checkForSuccess(job));
-  }
-
-  @Test
-  public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", service, null));
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(true /* success */, false /* tentative */));
-    doReturn(State.RUNNING).when(job).getState();
-    assertEquals(Optional.absent(), runner.checkForSuccess(job));
-  }
-
-  private LowLevelHttpResponse generateMockMetricResponse(boolean success, boolean tentative)
-      throws Exception {
-    MetricStructuredName name = new MetricStructuredName();
-    name.setName(success ? "PAssertSuccess" : "PAssertFailure");
-    name.setContext(
-        tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.<String, String>of());
-
-    MetricUpdate metric = new MetricUpdate();
-    metric.setName(name);
-    metric.setScalar(BigDecimal.ONE);
-
-    MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
-    response.setContentType(Json.MEDIA_TYPE);
-    JobMetrics jobMetrics = new JobMetrics();
-    jobMetrics.setMetrics(Lists.newArrayList(metric));
-    // N.B. Setting the factory is necessary in order to get valid JSON.
-    jobMetrics.setFactory(Transport.getJsonFactory());
-    response.setContent(jobMetrics.toPrettyString());
-    return response;
-  }
-
-  @Test
-  public void testStreamingPipelineFailsIfServiceFails() throws Exception {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob("test-project", "test-job", service, null));
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(true /* success */, false /* tentative */));
-    doReturn(State.FAILED).when(job).getState();
-    assertEquals(Optional.of(false), runner.checkForSuccess(job));
-  }
-
-  @Test
-  public void testStreamingPipelineFailsIfException() throws Exception {
-    options.setStreaming(true);
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getDataflowClient()).thenReturn(service);
-    when(mockJob.getState()).thenReturn(State.RUNNING);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
-        .thenAnswer(new Answer<State>() {
-          @Override
-          public State answer(InvocationOnMock invocation) {
-            JobMessage message = new JobMessage();
-            message.setMessageText("FooException");
-            message.setTime(TimeUtil.toCloudTime(Instant.now()));
-            message.setMessageImportance("JOB_MESSAGE_ERROR");
-            ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
-                .process(Arrays.asList(message));
-            return State.CANCELLED;
-          }
-        });
-
-    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(false /* success */, true /* tentative */));
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    try {
-      runner.run(p, mockRunner);
-    } catch (AssertionError expected) {
-      assertThat(expected.getMessage(), containsString("FooException"));
-      verify(mockJob, atLeastOnce()).cancel();
-      return;
-    }
-    // Note that fail throws an AssertionError which is why it is placed out here
-    // instead of inside the try-catch block.
-    fail("AssertionError expected");
-  }
-
-  @Test
-  public void testBatchOnCreateMatcher() throws Exception {
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getDataflowClient()).thenReturn(service);
-    when(mockJob.getState()).thenReturn(State.DONE);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    p.getOptions().as(TestPipelineOptions.class)
-        .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
-
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(true /* success */, true /* tentative */));
-    runner.run(p, mockRunner);
-  }
-
-  @Test
-  public void testStreamingOnCreateMatcher() throws Exception {
-    options.setStreaming(true);
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getDataflowClient()).thenReturn(service);
-    when(mockJob.getState()).thenReturn(State.DONE);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    p.getOptions().as(TestPipelineOptions.class)
-        .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
-
-    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
-        .thenReturn(State.DONE);
-
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(true /* success */, true /* tentative */));
-    runner.run(p, mockRunner);
-  }
-
-  @Test
-  public void testBatchOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getDataflowClient()).thenReturn(service);
-    when(mockJob.getState()).thenReturn(State.DONE);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    p.getOptions().as(TestPipelineOptions.class)
-        .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
-
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(true /* success */, true /* tentative */));
-    runner.run(p, mockRunner);
-  }
-
-  @Test
-  public void testStreamingOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
-    options.setStreaming(true);
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getDataflowClient()).thenReturn(service);
-    when(mockJob.getState()).thenReturn(State.DONE);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    p.getOptions().as(TestPipelineOptions.class)
-        .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
-
-    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
-        .thenReturn(State.DONE);
-
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(true /* success */, true /* tentative */));
-    runner.run(p, mockRunner);
-  }
-
-  @Test
-  public void testBatchOnSuccessMatcherWhenPipelineFails() throws Exception {
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getDataflowClient()).thenReturn(service);
-    when(mockJob.getState()).thenReturn(State.FAILED);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    p.getOptions().as(TestPipelineOptions.class)
-        .setOnSuccessMatcher(new TestFailureMatcher());
-
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(false /* success */, true /* tentative */));
-    try {
-      runner.run(p, mockRunner);
-    } catch (AssertionError expected) {
-      verify(mockJob, Mockito.times(1)).waitToFinish(any(Long.class), any(TimeUnit.class),
-          any(JobMessagesHandler.class));
-      return;
-    }
-    fail("Expected an exception on pipeline failure.");
-  }
-
-  @Test
-  public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
-    options.setStreaming(true);
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getDataflowClient()).thenReturn(service);
-    when(mockJob.getState()).thenReturn(State.FAILED);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
-    p.getOptions().as(TestPipelineOptions.class)
-        .setOnSuccessMatcher(new TestFailureMatcher());
-
-    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
-        .thenReturn(State.FAILED);
-
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(false /* success */, true /* tentative */));
-    try {
-      runner.run(p, mockRunner);
-    } catch (AssertionError expected) {
-      verify(mockJob, Mockito.times(1)).waitToFinish(any(Long.class), any(TimeUnit.class),
-          any(JobMessagesHandler.class));
-      return;
-    }
-    fail("Expected an exception on pipeline failure.");
-  }
-
-  static class TestSuccessMatcher extends BaseMatcher<PipelineResult> implements
-      SerializableMatcher<PipelineResult> {
-    private final DataflowPipelineJob mockJob;
-    private final int called;
-
-    public TestSuccessMatcher(DataflowPipelineJob job, int times) {
-      this.mockJob = job;
-      this.called = times;
-    }
-
-    @Override
-    public boolean matches(Object o) {
-      if (!(o instanceof PipelineResult)) {
-        fail(String.format("Expected PipelineResult but received %s", o));
-      }
-      try {
-        verify(mockJob, Mockito.times(called)).waitToFinish(any(Long.class), any(TimeUnit.class),
-            any(JobMessagesHandler.class));
-      } catch (IOException | InterruptedException e) {
-        throw new AssertionError(e);
-      }
-      assertSame(mockJob, o);
-      return true;
-    }
-
-    @Override
-    public void describeTo(Description description) {
-    }
-  }
-
-  static class TestFailureMatcher extends BaseMatcher<PipelineResult> implements
-      SerializableMatcher<PipelineResult> {
-    @Override
-    public boolean matches(Object o) {
-      fail("OnSuccessMatcher should not be called on pipeline failure.");
-      return false;
-    }
-
-    @Override
-    public void describeTo(Description description) {
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
new file mode 100644
index 0000000..4067f08
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.testing;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.dataflow.DataflowPipelineJob;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
+import org.apache.beam.runners.dataflow.util.TimeUtil;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SerializableMatcher;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.NoopPathValidator;
+import org.apache.beam.sdk.util.TestCredential;
+import org.apache.beam.sdk.util.Transport;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.api.client.http.LowLevelHttpResponse;
+import com.google.api.client.json.Json;
+import com.google.api.client.testing.http.MockHttpTransport;
+import com.google.api.client.testing.http.MockLowLevelHttpRequest;
+import com.google.api.client.testing.http.MockLowLevelHttpResponse;
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricStructuredName;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+/** Tests for {@link TestDataflowPipelineRunner}. */
+@RunWith(JUnit4.class)
+public class TestDataflowRunnerTest {
+  @Rule public ExpectedException expectedException = ExpectedException.none();
+  @Mock private MockHttpTransport transport;
+  @Mock private MockLowLevelHttpRequest request;
+  @Mock private GcsUtil mockGcsUtil;
+
+  private TestDataflowPipelineOptions options;
+  private Dataflow service;
+
+  @Before
+  public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+    when(transport.buildRequest(anyString(), anyString())).thenReturn(request);
+    doCallRealMethod().when(request).getContentAsString();
+    service = new Dataflow(transport, Transport.getJsonFactory(), null);
+
+    options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
+    options.setAppName("TestAppName");
+    options.setProject("test-project");
+    options.setTempLocation("gs://test/temp/location");
+    options.setTempRoot("gs://test");
+    options.setGcpCredential(new TestCredential());
+    options.setDataflowClient(service);
+    options.setRunner(TestDataflowPipelineRunner.class);
+    options.setPathValidatorClass(NoopPathValidator.class);
+  }
+
+  @Test
+  public void testToString() {
+    assertEquals("TestDataflowPipelineRunner#TestAppName",
+        new TestDataflowPipelineRunner(options).toString());
+  }
+
+  @Test
+  public void testRunBatchJobThatSucceeds() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(true /* success */, true /* tentative */));
+    assertEquals(mockJob, runner.run(p, mockRunner));
+  }
+
+  @Test
+  public void testRunBatchJobThatFails() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.FAILED);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      return;
+    }
+    // Note that fail throws an AssertionError which is why it is placed out here
+    // instead of inside the try-catch block.
+    fail("AssertionError expected");
+  }
+
+  @Test
+  public void testBatchPipelineFailsIfException() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.RUNNING);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+        .thenAnswer(new Answer<State>() {
+          @Override
+          public State answer(InvocationOnMock invocation) {
+            JobMessage message = new JobMessage();
+            message.setMessageText("FooException");
+            message.setTime(TimeUtil.toCloudTime(Instant.now()));
+            message.setMessageImportance("JOB_MESSAGE_ERROR");
+            ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
+                .process(Arrays.asList(message));
+            return State.CANCELLED;
+          }
+        });
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(false /* success */, true /* tentative */));
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      assertThat(expected.getMessage(), containsString("FooException"));
+      verify(mockJob, atLeastOnce()).cancel();
+      return;
+    }
+    // Note that fail throws an AssertionError which is why it is placed out here
+    // instead of inside the try-catch block.
+    fail("AssertionError expected");
+  }
+
+  @Test
+  public void testRunStreamingJobThatSucceeds() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.RUNNING);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(true /* success */, true /* tentative */));
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    runner.run(p, mockRunner);
+  }
+
+  @Test
+  public void testRunStreamingJobThatFails() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.RUNNING);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(false /* success */, true /* tentative */));
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      return;
+    }
+    // Note that fail throws an AssertionError which is why it is placed out here
+    // instead of inside the try-catch block.
+    fail("AssertionError expected");
+  }
+
+  @Test
+  public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(true /* success */, true /* tentative */));
+    doReturn(State.DONE).when(job).getState();
+    assertEquals(Optional.of(true), runner.checkForSuccess(job));
+  }
+
+  @Test
+  public void testCheckingForSuccessWhenPAssertFails() throws Exception {
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(false /* success */, true /* tentative */));
+    doReturn(State.DONE).when(job).getState();
+    assertEquals(Optional.of(false), runner.checkForSuccess(job));
+  }
+
+  @Test
+  public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(true /* success */, false /* tentative */));
+    doReturn(State.RUNNING).when(job).getState();
+    assertEquals(Optional.absent(), runner.checkForSuccess(job));
+  }
+
+  private LowLevelHttpResponse generateMockMetricResponse(boolean success, boolean tentative)
+      throws Exception {
+    MetricStructuredName name = new MetricStructuredName();
+    name.setName(success ? "PAssertSuccess" : "PAssertFailure");
+    name.setContext(
+        tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.<String, String>of());
+
+    MetricUpdate metric = new MetricUpdate();
+    metric.setName(name);
+    metric.setScalar(BigDecimal.ONE);
+
+    MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
+    response.setContentType(Json.MEDIA_TYPE);
+    JobMetrics jobMetrics = new JobMetrics();
+    jobMetrics.setMetrics(Lists.newArrayList(metric));
+    // N.B. Setting the factory is necessary in order to get valid JSON.
+    jobMetrics.setFactory(Transport.getJsonFactory());
+    response.setContent(jobMetrics.toPrettyString());
+    return response;
+  }
+
+  @Test
+  public void testStreamingPipelineFailsIfServiceFails() throws Exception {
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(true /* success */, false /* tentative */));
+    doReturn(State.FAILED).when(job).getState();
+    assertEquals(Optional.of(false), runner.checkForSuccess(job));
+  }
+
+  @Test
+  public void testStreamingPipelineFailsIfException() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.RUNNING);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+        .thenAnswer(new Answer<State>() {
+          @Override
+          public State answer(InvocationOnMock invocation) {
+            JobMessage message = new JobMessage();
+            message.setMessageText("FooException");
+            message.setTime(TimeUtil.toCloudTime(Instant.now()));
+            message.setMessageImportance("JOB_MESSAGE_ERROR");
+            ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
+                .process(Arrays.asList(message));
+            return State.CANCELLED;
+          }
+        });
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(false /* success */, true /* tentative */));
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      assertThat(expected.getMessage(), containsString("FooException"));
+      verify(mockJob, atLeastOnce()).cancel();
+      return;
+    }
+    // Note that fail throws an AssertionError which is why it is placed out here
+    // instead of inside the try-catch block.
+    fail("AssertionError expected");
+  }
+
+  @Test
+  public void testBatchOnCreateMatcher() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    p.getOptions().as(TestPipelineOptions.class)
+        .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(true /* success */, true /* tentative */));
+    runner.run(p, mockRunner);
+  }
+
+  @Test
+  public void testStreamingOnCreateMatcher() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    p.getOptions().as(TestPipelineOptions.class)
+        .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
+
+    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+        .thenReturn(State.DONE);
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(true /* success */, true /* tentative */));
+    runner.run(p, mockRunner);
+  }
+
+  @Test
+  public void testBatchOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    p.getOptions().as(TestPipelineOptions.class)
+        .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(true /* success */, true /* tentative */));
+    runner.run(p, mockRunner);
+  }
+
+  @Test
+  public void testStreamingOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    p.getOptions().as(TestPipelineOptions.class)
+        .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
+
+    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+        .thenReturn(State.DONE);
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(true /* success */, true /* tentative */));
+    runner.run(p, mockRunner);
+  }
+
+  @Test
+  public void testBatchOnSuccessMatcherWhenPipelineFails() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.FAILED);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    p.getOptions().as(TestPipelineOptions.class)
+        .setOnSuccessMatcher(new TestFailureMatcher());
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(false /* success */, true /* tentative */));
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      verify(mockJob, Mockito.times(1)).waitToFinish(any(Long.class), any(TimeUnit.class),
+          any(JobMessagesHandler.class));
+      return;
+    }
+    fail("Expected an exception on pipeline failure.");
+  }
+
+  @Test
+  public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.FAILED);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    p.getOptions().as(TestPipelineOptions.class)
+        .setOnSuccessMatcher(new TestFailureMatcher());
+
+    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+        .thenReturn(State.FAILED);
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(false /* success */, true /* tentative */));
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      verify(mockJob, Mockito.times(1)).waitToFinish(any(Long.class), any(TimeUnit.class),
+          any(JobMessagesHandler.class));
+      return;
+    }
+    fail("Expected an exception on pipeline failure.");
+  }
+
+  static class TestSuccessMatcher extends BaseMatcher<PipelineResult> implements
+      SerializableMatcher<PipelineResult> {
+    private final DataflowPipelineJob mockJob;
+    private final int called;
+
+    public TestSuccessMatcher(DataflowPipelineJob job, int times) {
+      this.mockJob = job;
+      this.called = times;
+    }
+
+    @Override
+    public boolean matches(Object o) {
+      if (!(o instanceof PipelineResult)) {
+        fail(String.format("Expected PipelineResult but received %s", o));
+      }
+      try {
+        verify(mockJob, Mockito.times(called)).waitToFinish(any(Long.class), any(TimeUnit.class),
+            any(JobMessagesHandler.class));
+      } catch (IOException | InterruptedException e) {
+        throw new AssertionError(e);
+      }
+      assertSame(mockJob, o);
+      return true;
+    }
+
+    @Override
+    public void describeTo(Description description) {
+    }
+  }
+
+  static class TestFailureMatcher extends BaseMatcher<PipelineResult> implements
+      SerializableMatcher<PipelineResult> {
+    @Override
+    public boolean matches(Object o) {
+      fail("OnSuccessMatcher should not be called on pipeline failure.");
+      return false;
+    }
+
+    @Override
+    public void describeTo(Description description) {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java
index 0b865c3..d809cc6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.dataflow.transforms;
 
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.GcpOptions;
@@ -30,7 +30,7 @@ import com.google.common.collect.Lists;
 
 /**
  * Factory methods for creating {@link DisplayDataEvaluator} instances against the
- * {@link DataflowPipelineRunner}.
+ * {@link DataflowRunner}.
  */
 public final class DataflowDisplayDataEvaluator {
   /** Do not instantiate. */
@@ -43,7 +43,7 @@ public final class DataflowDisplayDataEvaluator {
   public static DataflowPipelineOptions getDefaultOptions() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
 
-    options.setRunner(DataflowPipelineRunner.class);
+    options.setRunner(DataflowRunner.class);
     options.setProject("foobar");
     options.setTempLocation("gs://bucket/tmpLocation");
     options.setFilesToStage(Lists.<String>newArrayList());
@@ -56,7 +56,7 @@ public final class DataflowDisplayDataEvaluator {
 
   /**
    * Create a {@link DisplayDataEvaluator} instance to evaluate pipeline display data against
-   * the {@link DataflowPipelineRunner}.
+   * the {@link DataflowRunner}.
    */
   public static DisplayDataEvaluator create() {
     return create(getDefaultOptions());
@@ -64,7 +64,7 @@ public final class DataflowDisplayDataEvaluator {
 
   /**
    * Create a {@link DisplayDataEvaluator} instance to evaluate pipeline display data against
-   * the {@link DataflowPipelineRunner} with the specified {@code options}.
+   * the {@link DataflowRunner} with the specified {@code options}.
    */
   public static DisplayDataEvaluator create(DataflowPipelineOptions options) {
     return DisplayDataEvaluator.create(options);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
index f0e677e..a44b8a7 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.dataflow.transforms;
 
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
@@ -46,20 +46,20 @@ import org.junit.runners.JUnit4;
 import java.util.Arrays;
 import java.util.List;
 
-/** Tests for {@link GroupByKey} for the {@link DataflowPipelineRunner}. */
+/** Tests for {@link GroupByKey} for the {@link DataflowRunner}. */
 @RunWith(JUnit4.class)
 public class DataflowGroupByKeyTest {
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
   /**
-   * Create a test pipeline that uses the {@link DataflowPipelineRunner} so that {@link GroupByKey}
+   * Create a test pipeline that uses the {@link DataflowRunner} so that {@link GroupByKey}
    * is not expanded. This is used for verifying that even without expansion the proper errors show
    * up.
    */
   private Pipeline createTestServiceRunner() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setRunner(DataflowPipelineRunner.class);
+    options.setRunner(DataflowRunner.class);
     options.setProject("someproject");
     options.setStagingLocation("gs://staging");
     options.setPathValidatorClass(NoopPathValidator.class);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
index d787500..1b263d2 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.dataflow.transforms;
 
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -44,7 +44,7 @@ import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/** Tests for {@link View} for a {@link DataflowPipelineRunner}. */
+/** Tests for {@link View} for a {@link DataflowRunner}. */
 @RunWith(JUnit4.class)
 public class DataflowViewTest {
   @Rule
@@ -52,7 +52,7 @@ public class DataflowViewTest {
 
   private Pipeline createTestBatchRunner() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setRunner(DataflowPipelineRunner.class);
+    options.setRunner(DataflowRunner.class);
     options.setProject("someproject");
     options.setStagingLocation("gs://staging");
     options.setPathValidatorClass(NoopPathValidator.class);
@@ -62,7 +62,7 @@ public class DataflowViewTest {
 
   private Pipeline createTestStreamingRunner() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setRunner(DataflowPipelineRunner.class);
+    options.setRunner(DataflowRunner.class);
     options.setStreaming(true);
     options.setProject("someproject");
     options.setStagingLocation("gs://staging");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
index 5587986..a91f56c 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
@@ -21,7 +21,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.when;
 
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.util.GcsUtil;
@@ -52,7 +52,7 @@ public class DataflowPathValidatorTest {
     when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     options.setGcpCredential(new TestCredential());
-    options.setRunner(DataflowPipelineRunner.class);
+    options.setRunner(DataflowRunner.class);
     options.setGcsUtil(mockGcsUtil);
     validator = new DataflowPathValidator(options);
   }
@@ -66,7 +66,7 @@ public class DataflowPathValidatorTest {
   public void testInvalidFilePattern() {
     expectedException.expect(IllegalArgumentException.class);
     expectedException.expectMessage(
-        "DataflowPipelineRunner expected a valid 'gs://' path but was given '/local/path'");
+        "DataflowRunner expected a valid 'gs://' path but was given '/local/path'");
     validator.validateInputFilePatternSupported("/local/path");
   }
 
@@ -88,7 +88,7 @@ public class DataflowPathValidatorTest {
   public void testInvalidOutputPrefix() {
     expectedException.expect(IllegalArgumentException.class);
     expectedException.expectMessage(
-        "DataflowPipelineRunner expected a valid 'gs://' path but was given '/local/path'");
+        "DataflowRunner expected a valid 'gs://' path but was given '/local/path'");
     validator.validateOutputFilePrefixSupported("/local/path");
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index 61ad24f..2b4464d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -113,7 +113,7 @@ public class SimpleWordCountTest {
       String[] words = WORD_BOUNDARY.split(c.element());
 
       // Keep track of the number of lines without any words encountered while tokenizing.
-      // This aggregator is visible in the monitoring UI when run using DataflowPipelineRunner.
+      // This aggregator is visible in the monitoring UI when run using DataflowRunner.
       if (words.length == 0) {
         emptyLines.addValue(1L);
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
index 22a2241..de3c152 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
@@ -158,7 +158,7 @@ public class SerializationTest {
       String[] words = WORD_BOUNDARY.split(c.element().toString());
 
       // Keep track of the number of lines without any words encountered while tokenizing.
-      // This aggregator is visible in the monitoring UI when run using DataflowPipelineRunner.
+      // This aggregator is visible in the monitoring UI when run using DataflowRunner.
       if (words.length == 0) {
         emptyLines.addValue(1L);
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/package-info.java
index 479090f..8719384 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/package-info.java
@@ -17,16 +17,16 @@
  */
 /**
  * Defines runners for executing Pipelines in different modes, including
- * {@link org.apache.beam.sdk.runners.DirectPipelineRunner} and
- * {@link org.apache.beam.sdk.runners.DataflowPipelineRunner}.
+ * {@link org.apache.beam.sdk.runners.DirectRunner} and
+ * {@link org.apache.beam.sdk.runners.DataflowRunner}.
  *
- * <p>{@link org.apache.beam.sdk.runners.DirectPipelineRunner} executes a {@code Pipeline}
+ * <p>{@link org.apache.beam.sdk.runners.DirectRunner} executes a {@code Pipeline}
  * locally, without contacting the Dataflow service.
- * {@link org.apache.beam.sdk.runners.DataflowPipelineRunner} submits a
+ * {@link org.apache.beam.sdk.runners.DataflowRunner} submits a
  * {@code Pipeline} to the Dataflow service, which executes it on Dataflow-managed Compute Engine
- * instances. {@code DataflowPipelineRunner} returns
+ * instances. {@code DataflowRunner} returns
  * as soon as the {@code Pipeline} has been submitted. Use
- * {@link org.apache.beam.sdk.runners.BlockingDataflowPipelineRunner} to have execution
+ * {@link org.apache.beam.sdk.runners.BlockingDataflowRunner} to have execution
  * updates printed to the console.
  *
  * <p>The runner is specified as part {@link org.apache.beam.sdk.options.PipelineOptions}.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 0dba043..b901268 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -60,7 +60,7 @@ import javax.annotation.Nullable;
  *   <li>System property "beamTestPipelineOptions" must contain a JSON delimited list of pipeline
  *   options. For example:
  *   <pre>{@code [
- *     "--runner=org.apache.beam.runners.dataflow.testing.TestDataflowPipelineRunner",
+ *     "--runner=org.apache.beam.runners.dataflow.testing.TestDataflowRunner",
  *     "--project=mygcpproject",
  *     "--stagingLocation=gs://mygcsbucket/path"
  *     ]}</pre>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
index 4c98123..329dec5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
  * {@code Aggregator} by calling {@link Aggregator#addValue}.
  *
  * <p>Aggregators are visible in the monitoring UI, when the pipeline is run
- * using DataflowPipelineRunner or BlockingDataflowPipelineRunner, along with
+ * using DataflowRunner or BlockingDataflowRunner, along with
  * their current value. Aggregators may not become visible until the system
  * begins executing the ParDo transform that created them and/or their initial
  * value is changed.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java
index ad41a3f..3865654 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java
@@ -156,7 +156,7 @@ public class BigQueryTableRowIterator implements AutoCloseable {
       if (iteratorOverCurrentBatch != null && iteratorOverCurrentBatch.hasNext()) {
         // Embed schema information into the raw row, so that values have an
         // associated key.  This matches how rows are read when using the
-        // DataflowPipelineRunner.
+        // DataflowRunner.
         current = getTypedTableRow(schema.getFields(), iteratorOverCurrentBatch.next());
         return true;
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
index fb8bb72..f9ce018 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java
@@ -37,7 +37,7 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
 /**
- * Tests for DataflowPipelineRunner.
+ * Tests for DataflowRunner.
  */
 @RunWith(JUnit4.class)
 public class PipelineRunnerTest {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
index 43c990a..3306cb4 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
@@ -70,7 +70,7 @@ import java.util.regex.Pattern;
  * <pre>{@code
  *   --project=YOUR_PROJECT_ID
  *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
+ *   --runner=BlockingDataflowRunner
  *   --workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}
  * }
  * </pre>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
index 3e4fc86..98af2e7 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
@@ -17,7 +17,7 @@
  */
 package ${package};
 
-import org.apache.beam.runners.dataflow.BlockingDataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.BlockingDataflowRunner;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
@@ -63,7 +63,7 @@ public class MinimalWordCount {
     // in Google Cloud Storage to stage files.
     DataflowPipelineOptions options = PipelineOptionsFactory.create()
       .as(DataflowPipelineOptions.class);
-    options.setRunner(BlockingDataflowPipelineRunner.class);
+    options.setRunner(BlockingDataflowRunner.class);
     // CHANGE 1/3: Your project ID is required in order to run your pipeline on the Google Cloud.
     options.setProject("SET_YOUR_PROJECT_ID_HERE");
     // CHANGE 2/3: Your Google Cloud Storage path is required for staging local files.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
index 7dea9fe..8e56b03 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
@@ -82,7 +82,7 @@ import java.util.List;
  * <pre>{@code
  *   --project=YOUR_PROJECT_ID
  *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
+ *   --runner=BlockingDataflowRunner
  * }
  * </pre>
  *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
index fc1f4b5..07ed6d0 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
@@ -77,7 +77,7 @@ import org.apache.beam.sdk.values.PCollection;
  * <pre>{@code
  *   --project=YOUR_PROJECT_ID
  *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
+ *   --runner=BlockingDataflowRunner
  * }
  * </pre>
  * and an output prefix on GCS:

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
index 6ec4540..82f0eff 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java
@@ -17,9 +17,9 @@
  */
 package ${package}.common;
 
-import org.apache.beam.runners.dataflow.BlockingDataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.BlockingDataflowRunner;
 import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
 
@@ -251,10 +251,10 @@ public class DataflowExampleUtils {
   }
 
   public void setupRunner() {
-    if (options.isStreaming() && options.getRunner().equals(BlockingDataflowPipelineRunner.class)) {
+    if (options.isStreaming() && options.getRunner().equals(BlockingDataflowRunner.class)) {
       // In order to cancel the pipelines automatically,
-      // {@literal DataflowPipelineRunner} is forced to be used.
-      options.setRunner(DataflowPipelineRunner.class);
+      // {@literal DataflowRunner} is forced to be used.
+      options.setRunner(DataflowRunner.class);
     }
   }
 
@@ -268,7 +268,7 @@ public class DataflowExampleUtils {
     DataflowPipelineOptions copiedOptions = options.cloneAs(DataflowPipelineOptions.class);
     copiedOptions.setStreaming(false);
     copiedOptions.setWorkerHarnessContainerImage(
-        DataflowPipelineRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE);
+        DataflowRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE);
     copiedOptions.setNumWorkers(
         options.as(ExamplePubsubTopicOptions.class).getInjectorNumWorkers());
     copiedOptions.setJobName(options.getJobName() + "-injector");
@@ -298,7 +298,7 @@ public class DataflowExampleUtils {
   }
 
   /**
-   * If {@literal DataflowPipelineRunner} or {@literal BlockingDataflowPipelineRunner} is used,
+   * If {@literal DataflowRunner} or {@literal BlockingDataflowRunner} is used,
    * waits for the pipeline to finish and cancels it (and the injector) before the program exists.
    */
   public void waitToFinish(PipelineResult result) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
index 027431f..9a75bb7 100644
--- a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
+++ b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
  * Platform, you should specify the following command-line options:
  *   --project=<YOUR_PROJECT_ID>
  *   --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
- *   --runner=BlockingDataflowPipelineRunner
+ *   --runner=BlockingDataflowRunner
  */
 public class StarterPipeline {
   private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
index bb86b0d..8c71d9d 100644
--- a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
+++ b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
  * Platform, you should specify the following command-line options:
  *   --project=<YOUR_PROJECT_ID>
  *   --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
- *   --runner=BlockingDataflowPipelineRunner
+ *   --runner=BlockingDataflowRunner
  */
 public class StarterPipeline {
   private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d028ac6/sdks/java/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index 09fe7d9..55aea6a 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -37,7 +37,7 @@
     <module>core</module>
     <module>io</module>
     <!-- sdks/java/maven-archtypes has several dependencies on the
-         DataflowPipelineRunner. Until these are refactored out or
+         DataflowRunner. Until these are refactored out or
          a released artifact exists, we need to modify the build order.
     <module>maven-archetypes</module> -->
     <module>extensions</module>