You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/03/17 21:46:55 UTC

[24/50] [abbrv] beam git commit: Remove Pipeline.getRunner

Remove Pipeline.getRunner

Runners need not be instantiated until after pipeline construction, so
they should not be exposed by the Pipeline class.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d41fe1df
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d41fe1df
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d41fe1df

Branch: refs/heads/gearpump-runner
Commit: d41fe1df26329479b82cc59d260998f2b88b4799
Parents: 2c2424c
Author: Thomas Groh <tg...@google.com>
Authored: Thu Mar 2 10:54:29 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 10 09:40:50 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       | 95 ++++++++++----------
 .../direct/TestStreamEvaluatorFactory.java      | 22 +++--
 .../direct/TestStreamEvaluatorFactoryTest.java  |  6 +-
 .../BatchStatefulParDoOverridesTest.java        |  4 +-
 .../DataflowPipelineTranslatorTest.java         | 39 ++++----
 .../runners/dataflow/DataflowRunnerTest.java    |  4 +-
 .../testing/TestDataflowRunnerTest.java         | 50 +++++------
 .../main/java/org/apache/beam/sdk/Pipeline.java |  7 --
 8 files changed, 110 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d41fe1df/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index f56d225..ce8dbc0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -70,53 +70,6 @@ import org.joda.time.Instant;
  */
 public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
   /**
-   * The default set of transform overrides to use in the {@link DirectRunner}.
-   *
-   * <p>The order in which overrides is applied is important, as some overrides are expanded into a
-   * composite. If the composite contains {@link PTransform PTransforms} which are also overridden,
-   * these PTransforms must occur later in the iteration order. {@link ImmutableMap} has an
-   * iteration order based on the order at which elements are added to it.
-   */
-  @SuppressWarnings("rawtypes")
-  private static Map<PTransformMatcher, PTransformOverrideFactory> defaultTransformOverrides =
-      ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder()
-          .put(
-              PTransformMatchers.writeWithRunnerDeterminedSharding(),
-              new WriteWithShardingFactory()) /* Uses a view internally. */
-          .put(
-              PTransformMatchers.classEqualTo(CreatePCollectionView.class),
-              new ViewOverrideFactory()) /* Uses pardos and GBKs */
-          .put(
-              PTransformMatchers.classEqualTo(TestStream.class),
-              new DirectTestStreamFactory()) /* primitive */
-          /* Single-output ParDos are implemented in terms of Multi-output ParDos. Any override
-          that is applied to a multi-output ParDo must first have all matching Single-output ParDos
-          converted to match.
-           */
-          .put(PTransformMatchers.splittableParDoSingle(), new ParDoSingleViaMultiOverrideFactory())
-          .put(
-              PTransformMatchers.stateOrTimerParDoSingle(),
-              new ParDoSingleViaMultiOverrideFactory())
-          // SplittableParMultiDo is implemented in terms of nonsplittable single ParDos
-          .put(PTransformMatchers.splittableParDoMulti(), new ParDoMultiOverrideFactory())
-          // state and timer pardos are implemented in terms of nonsplittable single ParDos
-          .put(PTransformMatchers.stateOrTimerParDoMulti(), new ParDoMultiOverrideFactory())
-          .put(
-              PTransformMatchers.classEqualTo(ParDo.Bound.class),
-              new ParDoSingleViaMultiOverrideFactory()) /* returns a BoundMulti */
-          .put(
-              PTransformMatchers.classEqualTo(BoundMulti.class),
-              /* returns one of two primitives; SplittableParDos are replaced above. */
-              new ParDoMultiOverrideFactory())
-          .put(
-              PTransformMatchers.classEqualTo(GBKIntoKeyedWorkItems.class),
-              new DirectGBKIntoKeyedWorkItemsOverrideFactory()) /* Returns a GBKO */
-          .put(
-              PTransformMatchers.classEqualTo(GroupByKey.class),
-              new DirectGroupByKeyOverrideFactory()) /* returns two chained primitives. */
-          .build();
-
-  /**
    * Part of a {@link PCollection}. Elements are output to a bundle, which will cause them to be
    * executed by {@link PTransform PTransforms} that consume the {@link PCollection} this bundle is
    * a part of at a later point. This is an uncommitted bundle and can have elements added to it.
@@ -309,7 +262,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
   @Override
   public DirectPipelineResult run(Pipeline pipeline) {
     for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
-        defaultTransformOverrides.entrySet()) {
+        defaultTransformOverrides().entrySet()) {
       pipeline.replace(override.getKey(), override.getValue());
     }
     MetricsEnvironment.setMetricsSupported(true);
@@ -361,6 +314,52 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
   }
 
   /**
+   * The default set of transform overrides to use in the {@link DirectRunner}.
+   *
+   * <p>The order in which overrides is applied is important, as some overrides are expanded into a
+   * composite. If the composite contains {@link PTransform PTransforms} which are also overridden,
+   * these PTransforms must occur later in the iteration order. {@link ImmutableMap} has an
+   * iteration order based on the order at which elements are added to it.
+   */
+  @SuppressWarnings("rawtypes")
+  private Map<PTransformMatcher, PTransformOverrideFactory> defaultTransformOverrides() {
+    return ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder()
+        .put(
+            PTransformMatchers.writeWithRunnerDeterminedSharding(),
+            new WriteWithShardingFactory()) /* Uses a view internally. */
+        .put(
+            PTransformMatchers.classEqualTo(CreatePCollectionView.class),
+            new ViewOverrideFactory()) /* Uses pardos and GBKs */
+        .put(
+            PTransformMatchers.classEqualTo(TestStream.class),
+            new DirectTestStreamFactory(this)) /* primitive */
+        /* Single-output ParDos are implemented in terms of Multi-output ParDos. Any override
+        that is applied to a multi-output ParDo must first have all matching Single-output ParDos
+        converted to match.
+         */
+        .put(PTransformMatchers.splittableParDoSingle(), new ParDoSingleViaMultiOverrideFactory())
+        .put(PTransformMatchers.stateOrTimerParDoSingle(), new ParDoSingleViaMultiOverrideFactory())
+        // SplittableParMultiDo is implemented in terms of nonsplittable single ParDos
+        .put(PTransformMatchers.splittableParDoMulti(), new ParDoMultiOverrideFactory())
+        // state and timer pardos are implemented in terms of nonsplittable single ParDos
+        .put(PTransformMatchers.stateOrTimerParDoMulti(), new ParDoMultiOverrideFactory())
+        .put(
+            PTransformMatchers.classEqualTo(ParDo.Bound.class),
+            new ParDoSingleViaMultiOverrideFactory()) /* returns a BoundMulti */
+        .put(
+            PTransformMatchers.classEqualTo(BoundMulti.class),
+            /* returns one of two primitives; SplittableParDos are replaced above. */
+            new ParDoMultiOverrideFactory())
+        .put(
+            PTransformMatchers.classEqualTo(GBKIntoKeyedWorkItems.class),
+            new DirectGBKIntoKeyedWorkItemsOverrideFactory()) /* Returns a GBKO */
+        .put(
+            PTransformMatchers.classEqualTo(GroupByKey.class),
+            new DirectGroupByKeyOverrideFactory()) /* returns two chained primitives. */
+        .build();
+  }
+
+  /**
    * The result of running a {@link Pipeline} with the {@link DirectRunner}.
    *
    * <p>Throws {@link UnsupportedOperationException} for all methods.

http://git-wip-us.apache.org/repos/asf/beam/blob/d41fe1df/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 628aa23..0dd8919 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -18,8 +18,6 @@
 
 package org.apache.beam.runners.direct;
 
-import static com.google.common.base.Preconditions.checkState;
-
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
@@ -35,7 +33,6 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
-import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.TestStream.ElementEvent;
 import org.apache.beam.sdk.testing.TestStream.Event;
@@ -166,11 +163,16 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
 
   static class DirectTestStreamFactory<T>
       implements PTransformOverrideFactory<PBegin, PCollection<T>, TestStream<T>> {
+    private final DirectRunner runner;
+
+    DirectTestStreamFactory(DirectRunner runner) {
+      this.runner = runner;
+    }
 
     @Override
     public PTransform<PBegin, PCollection<T>> getReplacementTransform(
         TestStream<T> transform) {
-      return new DirectTestStream<>(transform);
+      return new DirectTestStream<>(runner, transform);
     }
 
     @Override
@@ -185,22 +187,18 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
     }
 
     static class DirectTestStream<T> extends PTransform<PBegin, PCollection<T>> {
+      private final transient DirectRunner runner;
       private final TestStream<T> original;
 
       @VisibleForTesting
-      DirectTestStream(TestStream<T> transform) {
+      DirectTestStream(DirectRunner runner, TestStream<T> transform) {
+        this.runner = runner;
         this.original = transform;
       }
 
       @Override
       public PCollection<T> expand(PBegin input) {
-        PipelineRunner<?> runner = input.getPipeline().getRunner();
-        checkState(
-            runner instanceof DirectRunner,
-            "%s can only be used when running with the %s",
-            getClass().getSimpleName(),
-            DirectRunner.class.getSimpleName());
-        ((DirectRunner) runner).setClockSupplier(new TestClockSupplier());
+        runner.setClockSupplier(new TestClockSupplier());
         return PCollection.<T>createPrimitiveOutputInternal(
                 input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
             .setCoder(original.getValueCoder());

http://git-wip-us.apache.org/repos/asf/beam/blob/d41fe1df/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
index 9ed72d5..fc689fe 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
@@ -60,10 +60,12 @@ public class TestStreamEvaluatorFactoryTest {
 
   @Rule
   public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+  private DirectRunner runner;
 
   @Before
   public void setup() {
     context = mock(EvaluationContext.class);
+    runner = DirectRunner.fromOptions(TestPipeline.testingPipelineOptions());
     factory = new TestStreamEvaluatorFactory(context);
     bundleFactory = ImmutableListBundleFactory.create();
   }
@@ -80,7 +82,7 @@ public class TestStreamEvaluatorFactoryTest {
         .advanceProcessingTime(Duration.standardMinutes(10))
         .advanceWatermarkToInfinity();
     PCollection<Integer> streamVals =
-        p.apply(new DirectTestStream<Integer>(testStream));
+        p.apply(new DirectTestStream<Integer>(runner, testStream));
 
     TestClock clock = new TestClock();
     when(context.getClock()).thenReturn(clock);
@@ -180,7 +182,7 @@ public class TestStreamEvaluatorFactoryTest {
 
   @Test
   public void overrideFactoryGetInputSucceeds() {
-    DirectTestStreamFactory<?> factory = new DirectTestStreamFactory<>();
+    DirectTestStreamFactory<?> factory = new DirectTestStreamFactory<>(runner);
     PBegin begin = factory.getInput(Collections.<TaggedPValue>emptyList(), p);
     assertThat(begin.getPipeline(), Matchers.<Pipeline>equalTo(p));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/d41fe1df/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
index ef3e414..899902a 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
@@ -71,7 +71,7 @@ public class BatchStatefulParDoOverridesTest implements Serializable {
     DummyStatefulDoFn fn = new DummyStatefulDoFn();
     pipeline.apply(Create.of(KV.of(1, 2))).apply(ParDo.of(fn));
 
-    DataflowRunner runner = (DataflowRunner) pipeline.getRunner();
+    DataflowRunner runner = DataflowRunner.fromOptions(options);
     runner.replaceTransforms(pipeline);
     assertThat(findBatchStatefulDoFn(pipeline), equalTo((DoFn) fn));
   }
@@ -89,7 +89,7 @@ public class BatchStatefulParDoOverridesTest implements Serializable {
         .apply(Create.of(KV.of(1, 2)))
         .apply(ParDo.withOutputTags(mainOutputTag, TupleTagList.empty()).of(fn));
 
-    DataflowRunner runner = (DataflowRunner) pipeline.getRunner();
+    DataflowRunner runner = DataflowRunner.fromOptions(options);
     runner.replaceTransforms(pipeline);
     assertThat(findBatchStatefulDoFn(pipeline), equalTo((DoFn) fn));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/d41fe1df/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 660e92e..813e76d 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -190,7 +190,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
             .translate(
-                p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+                p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     Map<String, Object> sdkPipelineOptions = job.getEnvironment().getSdkPipelineOptions();
@@ -223,7 +223,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
             .translate(
-                p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+                p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -240,7 +240,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
             .translate(
-                p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+                p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -259,7 +259,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
             .translate(
-                p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+                p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -276,7 +276,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
             .translate(
-                p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+                p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -292,7 +292,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
             .translate(
-                p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+                p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -328,7 +328,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
             .translate(
-                p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+                p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -363,7 +363,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
             .translate(
-                p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+                p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -397,7 +397,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
             .translate(
-                p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+                p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -417,7 +417,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
             .translate(
-                p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+                p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -438,7 +438,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Job job =
         DataflowPipelineTranslator.fromOptions(options)
             .translate(
-                p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+                p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList())
             .getJob();
 
     assertEquals(1, job.getEnvironment().getWorkerPools().size());
@@ -653,7 +653,8 @@ public class DataflowPipelineTranslatorTest implements Serializable {
 
   @Test
   public void testMultiGraphPipelineSerialization() throws Exception {
-    Pipeline p = Pipeline.create(buildPipelineOptions());
+    DataflowPipelineOptions options = buildPipelineOptions();
+    Pipeline p = Pipeline.create(options);
 
     PCollection<Integer> input = p.begin()
         .apply(Create.of(1, 2, 3));
@@ -666,7 +667,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
 
     // Check that translation doesn't fail.
     JobSpecification jobSpecification = t.translate(
-        p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
+        p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList());
     assertAllStepOutputsHaveUniqueIds(jobSpecification.getJob());
   }
 
@@ -710,7 +711,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     // Check that translation doesn't fail.
     JobSpecification jobSpecification = t.translate(
         pipeline,
-        (DataflowRunner) pipeline.getRunner(),
+        DataflowRunner.fromOptions(options),
         Collections.<DataflowPackage>emptyList());
     assertAllStepOutputsHaveUniqueIds(jobSpecification.getJob());
   }
@@ -737,7 +738,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
         ThrowableMessageMatcher.hasMessage(containsString("Unsupported wildcard usage"))));
     t.translate(
         pipeline,
-        (DataflowRunner) pipeline.getRunner(),
+        DataflowRunner.fromOptions(options),
         Collections.<DataflowPackage>emptyList());
   }
 
@@ -764,7 +765,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     // Check that translation does not fail.
     t.translate(
         pipeline,
-        (DataflowRunner) pipeline.getRunner(),
+        DataflowRunner.fromOptions(options),
         Collections.<DataflowPackage>emptyList());
   }
 
@@ -785,7 +786,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
         translator
             .translate(
                 pipeline,
-                (DataflowRunner) pipeline.getRunner(),
+                DataflowRunner.fromOptions(options),
                 Collections.<DataflowPackage>emptyList())
             .getJob();
     assertAllStepOutputsHaveUniqueIds(job);
@@ -817,7 +818,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
         translator
             .translate(
                 pipeline,
-                (DataflowRunner) pipeline.getRunner(),
+                DataflowRunner.fromOptions(options),
                 Collections.<DataflowPackage>emptyList())
             .getJob();
     assertAllStepOutputsHaveUniqueIds(job);
@@ -1011,7 +1012,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
         translator
             .translate(
                 pipeline,
-                (DataflowRunner) pipeline.getRunner(),
+                DataflowRunner.fromOptions(options),
                 Collections.<DataflowPackage>emptyList())
             .getJob();
     assertAllStepOutputsHaveUniqueIds(job);

http://git-wip-us.apache.org/repos/asf/beam/blob/d41fe1df/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index a788077..a4031d1 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -951,7 +951,7 @@ public class DataflowRunnerTest {
     thrown.expectMessage(Matchers.containsString("no translator registered"));
     DataflowPipelineTranslator.fromOptions(options)
         .translate(
-            p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
+            p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList());
 
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
     Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
@@ -989,7 +989,7 @@ public class DataflowRunnerTest {
         });
 
     translator.translate(
-        p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
+        p, DataflowRunner.fromOptions(options), Collections.<DataflowPackage>emptyList());
     assertTrue(transform.translated);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d41fe1df/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
index 1e906d2..d3eccbb 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -140,7 +140,7 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     when(request.execute()).thenReturn(generateMockMetricResponse(true /* success */,
         true /* tentative */, null /* additionalMetrics */));
     assertEquals(mockJob, runner.run(p, mockRunner));
@@ -160,7 +160,7 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */,
         false /* tentative */, null /* additionalMetrics */));
     try {
@@ -202,7 +202,7 @@ public class TestDataflowRunnerTest {
 
     when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */,
         true /* tentative */, null /* additionalMetrics */));
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     try {
       runner.run(p, mockRunner);
     } catch (AssertionError expected) {
@@ -233,7 +233,7 @@ public class TestDataflowRunnerTest {
     when(request.execute())
         .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */,
             ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK)));
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     runner.run(p, mockRunner);
   }
 
@@ -254,7 +254,7 @@ public class TestDataflowRunnerTest {
     when(request.execute())
         .thenReturn(generateMockStreamingMetricResponse(
             ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK)));
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     runner.run(p, mockRunner);
   }
 
@@ -275,7 +275,7 @@ public class TestDataflowRunnerTest {
 
     when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */,
         true /* tentative */, null /* additionalMetrics */));
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     try {
       runner.run(p, mockRunner);
     } catch (AssertionError expected) {
@@ -350,7 +350,7 @@ public class TestDataflowRunnerTest {
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
 
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     doReturn(State.DONE).when(job).getState();
     JobMetrics metrics = buildJobMetrics(
         generateMockMetrics(true /* success */, true /* tentative */));
@@ -364,7 +364,7 @@ public class TestDataflowRunnerTest {
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
 
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     doReturn(State.DONE).when(job).getState();
     JobMetrics metrics = buildJobMetrics(
         generateMockMetrics(false /* success */, true /* tentative */));
@@ -392,7 +392,7 @@ public class TestDataflowRunnerTest {
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
         ImmutableMap.of("no-watermark", new BigDecimal(100))));
     doReturn(State.RUNNING).when(job).getState();
@@ -405,7 +405,7 @@ public class TestDataflowRunnerTest {
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
         ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK)));
     doReturn(State.RUNNING).when(job).getState();
@@ -418,7 +418,7 @@ public class TestDataflowRunnerTest {
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
         ImmutableMap.of(LEGACY_WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK)));
     doReturn(State.RUNNING).when(job).getState();
@@ -431,7 +431,7 @@ public class TestDataflowRunnerTest {
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics
         (ImmutableMap.of(WATERMARK_METRIC_SUFFIX, new BigDecimal(100))));
     doReturn(State.RUNNING).when(job).getState();
@@ -444,7 +444,7 @@ public class TestDataflowRunnerTest {
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
         ImmutableMap.of("one" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK,
             "two" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK)));
@@ -458,7 +458,7 @@ public class TestDataflowRunnerTest {
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
         ImmutableMap.of("one" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK,
             "two" + WATERMARK_METRIC_SUFFIX, new BigDecimal(100))));
@@ -472,7 +472,7 @@ public class TestDataflowRunnerTest {
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
         ImmutableMap.of("one" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK,
             "no-watermark", new BigDecimal(100))));
@@ -487,7 +487,7 @@ public class TestDataflowRunnerTest {
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
 
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     doReturn(State.FAILED).when(job).getState();
     assertEquals(Optional.of(false), runner.checkForPAssertSuccess(job, null /* metrics */));
   }
@@ -522,7 +522,7 @@ public class TestDataflowRunnerTest {
 
     when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */,
         true /* tentative */, null /* additionalMetrics */));
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     try {
       runner.run(p, mockRunner);
     } catch (AssertionError expected) {
@@ -543,7 +543,7 @@ public class TestDataflowRunnerTest {
 
     when(request.execute()).thenReturn(generateMockMetricResponse(true /* success */,
         true /* tentative */, null /* additionalMetrics */));
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     JobMetrics metrics = runner.getJobMetrics(job);
 
     assertEquals(1, metrics.getMetrics().size());
@@ -558,7 +558,7 @@ public class TestDataflowRunnerTest {
     p.apply(Create.of(1, 2, 3));
 
     when(request.execute()).thenThrow(new IOException());
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     assertNull(runner.getJobMetrics(job));
   }
 
@@ -576,7 +576,7 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     p.getOptions().as(TestPipelineOptions.class)
         .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
 
@@ -600,7 +600,7 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     p.getOptions().as(TestPipelineOptions.class)
         .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
 
@@ -627,7 +627,7 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     p.getOptions().as(TestPipelineOptions.class)
         .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
 
@@ -651,7 +651,7 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     p.getOptions().as(TestPipelineOptions.class)
         .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
 
@@ -678,7 +678,7 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     p.getOptions().as(TestPipelineOptions.class)
         .setOnSuccessMatcher(new TestFailureMatcher());
 
@@ -709,7 +709,7 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    TestDataflowRunner runner = (TestDataflowRunner) p.getRunner();
+    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
     p.getOptions().as(TestPipelineOptions.class)
         .setOnSuccessMatcher(new TestFailureMatcher());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d41fe1df/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index f09f2b4..2f368b1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -456,13 +456,6 @@ public class Pipeline {
   }
 
   /**
-   * Returns the configured {@link PipelineRunner}.
-   */
-  public PipelineRunner<?> getRunner() {
-    return runner;
-  }
-
-  /**
    * Returns the configured {@link PipelineOptions}.
    *
    * @deprecated see BEAM-818 Remove Pipeline.getPipelineOptions. Configuration should be explicitly