You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/16 02:30:37 UTC

[02/12] incubator-beam git commit: Rename InProcessPipelineRunner to DirectRunner

Rename InProcessPipelineRunner to DirectRunner

Completes BEAM-243


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2bafda1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2bafda1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2bafda1d

Branch: refs/heads/master
Commit: 2bafda1de06c2f7a410b242c5a44787dd21a7396
Parents: 021fa1e
Author: Thomas Groh <tg...@google.com>
Authored: Wed Jun 15 10:45:15 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Jun 15 15:53:47 2016 -0700

----------------------------------------------------------------------
 runners/direct-java/pom.xml                     |   2 +-
 .../direct/AbstractModelEnforcement.java        |   2 +-
 .../direct/BoundedReadEvaluatorFactory.java     |   4 +-
 .../beam/runners/direct/BundleFactory.java      |   4 +-
 .../beam/runners/direct/CommittedResult.java    |   2 +-
 .../beam/runners/direct/CompletionCallback.java |   2 +-
 .../beam/runners/direct/DirectOptions.java      | 101 +++++
 .../beam/runners/direct/DirectRegistrar.java    |  55 +++
 .../beam/runners/direct/DirectRunner.java       | 371 +++++++++++++++++++
 .../direct/EncodabilityEnforcementFactory.java  |   2 +-
 .../direct/ExecutorServiceParallelExecutor.java |   2 +-
 .../runners/direct/FlattenEvaluatorFactory.java |   6 +-
 .../ImmutabilityCheckingBundleFactory.java      |   4 +-
 .../direct/ImmutabilityEnforcementFactory.java  |   2 +-
 .../direct/InMemoryWatermarkManager.java        |   2 +-
 .../runners/direct/InProcessBundleFactory.java  |   4 +-
 .../direct/InProcessBundleOutputManager.java    |   6 +-
 .../direct/InProcessEvaluationContext.java      |  20 +-
 .../direct/InProcessExecutionContext.java       |   4 +-
 .../beam/runners/direct/InProcessExecutor.java  |   2 +-
 ...rocessGroupAlsoByWindowEvaluatorFactory.java |   4 +-
 ...InProcessGroupByKeyOnlyEvaluatorFactory.java |   6 +-
 .../direct/InProcessPipelineOptions.java        | 101 -----
 .../runners/direct/InProcessPipelineRunner.java | 370 ------------------
 .../beam/runners/direct/InProcessRegistrar.java |  55 ---
 .../direct/InProcessTransformResult.java        |   2 +-
 .../beam/runners/direct/ModelEnforcement.java   |   2 +-
 .../runners/direct/ModelEnforcementFactory.java |   2 +-
 .../runners/direct/ParDoInProcessEvaluator.java |   4 +-
 .../direct/ParDoMultiEvaluatorFactory.java      |   4 +-
 .../direct/ParDoSingleEvaluatorFactory.java     |   4 +-
 .../direct/PassthroughTransformEvaluator.java   |   2 +-
 .../runners/direct/StepTransformResult.java     |   2 +-
 .../beam/runners/direct/TransformEvaluator.java |   2 +-
 .../direct/TransformEvaluatorFactory.java       |   2 +-
 .../direct/TransformEvaluatorRegistry.java      |   2 +-
 .../beam/runners/direct/TransformExecutor.java  |   2 +-
 .../direct/UnboundedReadEvaluatorFactory.java   |   4 +-
 .../runners/direct/ViewEvaluatorFactory.java    |   6 +-
 .../runners/direct/WindowEvaluatorFactory.java  |   6 +-
 .../direct/AvroIOShardedWriteFactoryTest.java   |   2 +-
 .../direct/BoundedReadEvaluatorFactoryTest.java |   4 +-
 .../runners/direct/CommittedResultTest.java     |  12 +-
 .../runners/direct/DirectRegistrarTest.java     |  74 ++++
 .../beam/runners/direct/DirectRunnerTest.java   | 339 +++++++++++++++++
 .../EncodabilityEnforcementFactoryTest.java     |   2 +-
 .../direct/FlattenEvaluatorFactoryTest.java     |   4 +-
 .../direct/GroupByKeyEvaluatorFactoryTest.java  |   4 +-
 .../ImmutabilityCheckingBundleFactoryTest.java  |   4 +-
 .../ImmutabilityEnforcementFactoryTest.java     |   2 +-
 .../direct/InMemoryWatermarkManagerTest.java    |   4 +-
 .../direct/InProcessBundleFactoryTest.java      |   4 +-
 .../direct/InProcessEvaluationContextTest.java  |  10 +-
 ...ocessGroupByKeyOnlyEvaluatorFactoryTest.java |   4 +-
 .../direct/InProcessPipelineRegistrarTest.java  |  74 ----
 .../direct/InProcessPipelineRunnerTest.java     | 339 -----------------
 .../direct/ParDoInProcessEvaluatorTest.java     |   6 +-
 .../direct/ParDoMultiEvaluatorFactoryTest.java  |   4 +-
 .../direct/ParDoSingleEvaluatorFactoryTest.java |   4 +-
 .../direct/TextIOShardedWriteFactoryTest.java   |   2 +-
 .../runners/direct/TransformExecutorTest.java   |   2 +-
 .../UnboundedReadEvaluatorFactoryTest.java      |   4 +-
 .../direct/ViewEvaluatorFactoryTest.java        |   4 +-
 .../direct/WindowEvaluatorFactoryTest.java      |   4 +-
 .../translation/TransformTranslatorTest.java    |   4 +-
 .../beam/sdk/options/PipelineOptions.java       |   2 +-
 testing/travis/test_wordcount.sh                |   4 +-
 67 files changed, 1051 insertions(+), 1050 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 8667aee..3010757 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -102,7 +102,7 @@
               <systemPropertyVariables>
                 <beamTestPipelineOptions>
                   [
-                    "--runner=org.apache.beam.runners.direct.InProcessPipelineRunner"
+                    "--runner=DirectRunner"
                   ]
                 </beamTestPipelineOptions>
               </systemPropertyVariables>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
index 948beb6..2ae0275 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.util.WindowedValue;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index f15d446..63d248a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.Read.Bounded;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
index a0511df..a546cfb 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
@@ -18,8 +18,8 @@
 package org.apache.beam.runners.direct;
 
 import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
index 4a42e34..b241493 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
@@ -18,7 +18,7 @@
 
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 
 import com.google.auto.value.AutoValue;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
index 7c2c068..8ee4b44 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 
 /**
  * A callback for completing a bundle of input.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
new file mode 100644
index 0000000..3901c04
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.Hidden;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.Validation.Required;
+import org.apache.beam.sdk.transforms.PTransform;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Options that can be used to configure the {@link org.apache.beam.runners.direct.DirectRunner}.
+ */
+public interface DirectOptions extends PipelineOptions, ApplicationNameOptions {
+  /**
+   * Gets the {@link ExecutorServiceFactory} to use to create instances of {@link ExecutorService}
+   * to execute {@link PTransform PTransforms}.
+   *
+   * <p>Note that {@link ExecutorService ExecutorServices} returned by the factory must ensure that
+   * it cannot enter a state in which it will not schedule additional pending work unless currently
+   * scheduled work completes, as this may cause the {@link Pipeline} to cease processing.
+   *
+   * <p>Defaults to a {@link FixedThreadPoolExecutorServiceFactory}, which produces instances of
+   * {@link Executors#newCachedThreadPool()}.
+   */
+  @JsonIgnore
+  @Required
+  @Hidden
+  @Default.InstanceFactory(FixedThreadPoolExecutorServiceFactory.class)
+  ExecutorServiceFactory getExecutorServiceFactory();
+
+  void setExecutorServiceFactory(ExecutorServiceFactory executorService);
+
+  /**
+   * Gets the {@link Clock} used by this pipeline. The clock is used in place of accessing the
+   * system time when time values are required by the evaluator.
+   */
+  @Default.InstanceFactory(NanosOffsetClock.Factory.class)
+  @JsonIgnore
+  @Required
+  @Hidden
+  @Description(
+      "The processing time source used by the pipeline. When the current time is "
+          + "needed by the evaluator, the result of clock#now() is used.")
+  Clock getClock();
+
+  void setClock(Clock clock);
+
+  @Default.Boolean(false)
+  @Description(
+      "If the pipeline should shut down producers which have reached the maximum "
+          + "representable watermark. If this is set to true, a pipeline in which all PTransforms "
+          + "have reached the maximum watermark will be shut down, even if there are unbounded "
+          + "sources that could produce additional (late) data. By default, if the pipeline "
+          + "contains any unbounded PCollections, it will run until explicitly shut down.")
+  boolean isShutdownUnboundedProducersWithMaxWatermark();
+
+  void setShutdownUnboundedProducersWithMaxWatermark(boolean shutdown);
+
+  @Default.Boolean(true)
+  @Description(
+      "If the pipeline should block awaiting completion of the pipeline. If set to true, "
+          + "a call to Pipeline#run() will block until all PTransforms are complete. Otherwise, "
+          + "the Pipeline will execute asynchronously. If set to false, the completion of the "
+          + "pipeline can be awaited on by use of DirectPipelineResult#awaitCompletion().")
+  boolean isBlockOnRun();
+
+  void setBlockOnRun(boolean b);
+
+  @Default.Boolean(true)
+  @Description(
+      "Controls whether the runner should ensure that all of the elements of every "
+          + "PCollection are not mutated. PTransforms are not permitted to mutate input elements "
+          + "at any point, or output elements after they are output.")
+  boolean isTestImmutability();
+
+  void setTestImmutability(boolean test);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
new file mode 100644
index 0000000..eb027fa
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the
+ * {@link DirectRunner}.
+ */
+public class DirectRegistrar {
+  private DirectRegistrar() {}
+  /**
+   * Registers the {@link DirectRunner}.
+   */
+  @AutoService(PipelineRunnerRegistrar.class)
+  public static class InProcessRunner implements PipelineRunnerRegistrar {
+    @Override
+    public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(DirectRunner.class);
+    }
+  }
+
+  /**
+   * Registers the {@link DirectOptions}.
+   */
+  @AutoService(PipelineOptionsRegistrar.class)
+  public static class InProcessOptions implements PipelineOptionsRegistrar {
+    @Override
+    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+      return ImmutableList.<Class<? extends PipelineOptions>>of(DirectOptions.class);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
new file mode 100644
index 0000000..2f5a0bc
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
+import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
+import org.apache.beam.runners.direct.ViewEvaluatorFactory.InProcessViewOverrideFactory;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.AggregatorPipelineExtractor;
+import org.apache.beam.sdk.runners.AggregatorRetrievalException;
+import org.apache.beam.sdk.runners.AggregatorValues;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.util.MapAggregatorValues;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.Counter;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.joda.time.Instant;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * An In-Memory implementation of the Dataflow Programming Model. Supports Unbounded
+ * {@link PCollection PCollections}.
+ */
+@Experimental
+public class DirectRunner
+    extends PipelineRunner<DirectPipelineResult> {
+  /**
+   * The default set of transform overrides to use in the {@link DirectRunner}.
+   *
+   * <p>A transform override must have a single-argument constructor that takes an instance of the
+   * type of transform it is overriding.
+   */
+  @SuppressWarnings("rawtypes")
+  private static Map<Class<? extends PTransform>, PTransformOverrideFactory>
+      defaultTransformOverrides =
+          ImmutableMap.<Class<? extends PTransform>, PTransformOverrideFactory>builder()
+              .put(GroupByKey.class, new InProcessGroupByKeyOverrideFactory())
+              .put(CreatePCollectionView.class, new InProcessViewOverrideFactory())
+              .put(AvroIO.Write.Bound.class, new AvroIOShardedWriteFactory())
+              .put(TextIO.Write.Bound.class, new TextIOShardedWriteFactory())
+              .build();
+
+  /**
+   * Part of a {@link PCollection}. Elements are output to a bundle, which will cause them to be
+   * executed by {@link PTransform PTransforms} that consume the {@link PCollection} this bundle is
+   * a part of at a later point. This is an uncommitted bundle and can have elements added to it.
+   *
+   * @param <T> the type of elements that can be added to this bundle
+   */
+  public static interface UncommittedBundle<T> {
+    /**
+     * Returns the PCollection that the elements of this {@link UncommittedBundle} belong to.
+     */
+    PCollection<T> getPCollection();
+
+    /**
+     * Outputs an element to this bundle.
+     *
+     * @param element the element to add to this bundle
+     * @return this bundle
+     */
+    UncommittedBundle<T> add(WindowedValue<T> element);
+
+    /**
+     * Commits this {@link UncommittedBundle}, returning an immutable {@link CommittedBundle}
+     * containing all of the elements that were added to it. The {@link #add(WindowedValue)} method
+     * will throw an {@link IllegalStateException} if called after a call to commit.
+     * @param synchronizedProcessingTime the synchronized processing time at which this bundle was
+     *                                   committed
+     */
+    CommittedBundle<T> commit(Instant synchronizedProcessingTime);
+  }
+
+  /**
+   * Part of a {@link PCollection}. Elements are output to an {@link UncommittedBundle}, which will
+   * eventually committed. Committed elements are executed by the {@link PTransform PTransforms}
+   * that consume the {@link PCollection} this bundle is
+   * a part of at a later point.
+   * @param <T> the type of elements contained within this bundle
+   */
+  public static interface CommittedBundle<T> {
+    /**
+     * Returns the PCollection that the elements of this bundle belong to.
+     */
+    PCollection<T> getPCollection();
+
+    /**
+     * Returns the key that was output in the most recent {@link GroupByKey} in the
+     * execution of this bundle.
+     */
+    StructuralKey<?> getKey();
+
+    /**
+     * Returns an {@link Iterable} containing all of the elements that have been added to this
+     * {@link CommittedBundle}.
+     */
+    Iterable<WindowedValue<T>> getElements();
+
+    /**
+     * Returns the processing time output watermark at the time the producing {@link PTransform}
+     * committed this bundle. Downstream synchronized processing time watermarks cannot progress
+     * past this point before consuming this bundle.
+     *
+     * <p>This value is no greater than the earliest incomplete processing time or synchronized
+     * processing time {@link TimerData timer} at the time this bundle was committed, including any
+     * timers that fired to produce this bundle.
+     */
+    Instant getSynchronizedProcessingOutputWatermark();
+
+    /**
+     * Return a new {@link CommittedBundle} that is like this one, except calls to
+     * {@link #getElements()} will return the provided elements. This bundle is unchanged.
+     *
+     * <p>
+     * The value of the {@link #getSynchronizedProcessingOutputWatermark() synchronized processing
+     * output watermark} of the returned {@link CommittedBundle} is equal to the value returned from
+     * the current bundle. This is used to ensure a {@link PTransform} that could not complete
+     * processing on input elements properly holds the synchronized processing time to the
+     * appropriate value.
+     */
+    CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements);
+  }
+
+  /**
+   * A {@link PCollectionViewWriter} is responsible for writing contents of a {@link PCollection} to
+   * a storage mechanism that can be read from while constructing a {@link PCollectionView}.
+   * @param <ElemT> the type of elements the input {@link PCollection} contains.
+   * @param <ViewT> the type of the PCollectionView this writer writes to.
+   */
+  public static interface PCollectionViewWriter<ElemT, ViewT> {
+    void add(Iterable<WindowedValue<ElemT>> values);
+  }
+
+  ////////////////////////////////////////////////////////////////////////////////////////////////
+  private final DirectOptions options;
+
+  public static DirectRunner fromOptions(PipelineOptions options) {
+    return new DirectRunner(options.as(DirectOptions.class));
+  }
+
+  private DirectRunner(DirectOptions options) {
+    this.options = options;
+  }
+
+  /**
+   * Returns the {@link PipelineOptions} used to create this {@link DirectRunner}.
+   */
+  public DirectOptions getPipelineOptions() {
+    return options;
+  }
+
+  @Override
+  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+      PTransform<InputT, OutputT> transform, InputT input) {
+    PTransformOverrideFactory overrideFactory = defaultTransformOverrides.get(transform.getClass());
+    if (overrideFactory != null) {
+      PTransform<InputT, OutputT> customTransform = overrideFactory.override(transform);
+
+      return super.apply(customTransform, input);
+    }
+    // If there is no override, or we should not apply the override, apply the original transform
+    return super.apply(transform, input);
+  }
+
+  @Override
+  public DirectPipelineResult run(Pipeline pipeline) {
+    ConsumerTrackingPipelineVisitor consumerTrackingVisitor = new ConsumerTrackingPipelineVisitor();
+    pipeline.traverseTopologically(consumerTrackingVisitor);
+    for (PValue unfinalized : consumerTrackingVisitor.getUnfinalizedPValues()) {
+      unfinalized.finishSpecifying();
+    }
+    @SuppressWarnings("rawtypes")
+    KeyedPValueTrackingVisitor keyedPValueVisitor =
+        KeyedPValueTrackingVisitor.create(
+            ImmutableSet.<Class<? extends PTransform>>of(
+                GroupByKey.class, InProcessGroupByKeyOnly.class));
+    pipeline.traverseTopologically(keyedPValueVisitor);
+
+    DisplayDataValidator.validatePipeline(pipeline);
+
+    InProcessEvaluationContext context =
+        InProcessEvaluationContext.create(
+            getPipelineOptions(),
+            createBundleFactory(getPipelineOptions()),
+            consumerTrackingVisitor.getRootTransforms(),
+            consumerTrackingVisitor.getValueToConsumers(),
+            consumerTrackingVisitor.getStepNames(),
+            consumerTrackingVisitor.getViews());
+
+    // independent executor service for each run
+    ExecutorService executorService =
+        context.getPipelineOptions().getExecutorServiceFactory().create();
+    InProcessExecutor executor =
+        ExecutorServiceParallelExecutor.create(
+            executorService,
+            consumerTrackingVisitor.getValueToConsumers(),
+            keyedPValueVisitor.getKeyedPValues(),
+            TransformEvaluatorRegistry.defaultRegistry(),
+            defaultModelEnforcements(options),
+            context);
+    executor.start(consumerTrackingVisitor.getRootTransforms());
+
+    Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
+        new AggregatorPipelineExtractor(pipeline).getAggregatorSteps();
+    DirectPipelineResult result =
+        new DirectPipelineResult(executor, context, aggregatorSteps);
+    if (options.isBlockOnRun()) {
+      try {
+        result.awaitCompletion();
+      } catch (UserCodeException userException) {
+        throw new PipelineExecutionException(userException.getCause());
+      } catch (Throwable t) {
+        if (t instanceof RuntimeException) {
+          throw (RuntimeException) t;
+        }
+        throw new RuntimeException(t);
+      }
+    }
+    return result;
+  }
+
+  private Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
+      defaultModelEnforcements(DirectOptions options) {
+    ImmutableMap.Builder<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
+        enforcements = ImmutableMap.builder();
+    Collection<ModelEnforcementFactory> parDoEnforcements = createParDoEnforcements(options);
+    enforcements.put(ParDo.Bound.class, parDoEnforcements);
+    enforcements.put(ParDo.BoundMulti.class, parDoEnforcements);
+    return enforcements.build();
+  }
+
+  private Collection<ModelEnforcementFactory> createParDoEnforcements(
+      DirectOptions options) {
+    ImmutableList.Builder<ModelEnforcementFactory> enforcements = ImmutableList.builder();
+    if (options.isTestImmutability()) {
+      enforcements.add(ImmutabilityEnforcementFactory.create());
+    }
+    return enforcements.build();
+  }
+
+  private BundleFactory createBundleFactory(DirectOptions pipelineOptions) {
+    BundleFactory bundleFactory = InProcessBundleFactory.create();
+    if (pipelineOptions.isTestImmutability()) {
+      bundleFactory = ImmutabilityCheckingBundleFactory.create(bundleFactory);
+    }
+    return bundleFactory;
+  }
+
+  /**
+   * The result of running a {@link Pipeline} with the {@link DirectRunner}.
+   *
+   * Throws {@link UnsupportedOperationException} for all methods.
+   */
+  public static class DirectPipelineResult implements PipelineResult {
+    private final InProcessExecutor executor;
+    private final InProcessEvaluationContext evaluationContext;
+    private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps;
+    private State state;
+
+    private DirectPipelineResult(
+        InProcessExecutor executor,
+        InProcessEvaluationContext evaluationContext,
+        Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps) {
+      this.executor = executor;
+      this.evaluationContext = evaluationContext;
+      this.aggregatorSteps = aggregatorSteps;
+      // Only ever constructed after the executor has started.
+      this.state = State.RUNNING;
+    }
+
+    @Override
+    public State getState() {
+      return state;
+    }
+
+    @Override
+    public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
+        throws AggregatorRetrievalException {
+      CounterSet counters = evaluationContext.getCounters();
+      Collection<PTransform<?, ?>> steps = aggregatorSteps.get(aggregator);
+      Map<String, T> stepValues = new HashMap<>();
+      for (AppliedPTransform<?, ?, ?> transform : evaluationContext.getSteps()) {
+        if (steps.contains(transform.getTransform())) {
+          String stepName =
+              String.format(
+                  "user-%s-%s", evaluationContext.getStepName(transform), aggregator.getName());
+          Counter<T> counter = (Counter<T>) counters.getExistingCounter(stepName);
+          if (counter != null) {
+            stepValues.put(transform.getFullName(), counter.getAggregate());
+          }
+        }
+      }
+      return new MapAggregatorValues<>(stepValues);
+    }
+
+    /**
+     * Blocks until the {@link Pipeline} execution represented by this
+     * {@link DirectPipelineResult} is complete, returning the terminal state.
+     *
+     * <p>If the pipeline terminates abnormally by throwing an exception, this will rethrow the
+     * exception. Future calls to {@link #getState()} will return
+     * {@link org.apache.beam.sdk.PipelineResult.State#FAILED}.
+     *
+     * <p>NOTE: if the {@link Pipeline} contains an {@link IsBounded#UNBOUNDED unbounded}
+     * {@link PCollection}, and the {@link PipelineRunner} was created with
+     * {@link DirectOptions#isShutdownUnboundedProducersWithMaxWatermark()} set to false,
+     * this method will never return.
+     *
+     * See also {@link InProcessExecutor#awaitCompletion()}.
+     */
+    public State awaitCompletion() throws Throwable {
+      if (!state.isTerminal()) {
+        try {
+          executor.awaitCompletion();
+          state = State.DONE;
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw e;
+        } catch (Throwable t) {
+          state = State.FAILED;
+          throw t;
+        }
+      }
+      return state;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactory.java
index ccf4c2b..bed61ec 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactory.java
@@ -19,7 +19,7 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.util.CoderUtils;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 980d764..14570a5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.direct;
 
 import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
index 2efaad3..bbe8787 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 
 /**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the {@link Flatten}
+ * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the {@link Flatten}
  * {@link PTransform}.
  */
 class FlattenEvaluatorFactory implements TransformEvaluatorFactory {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
index 2a965ed..dcbe3d1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.transforms.DoFn;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
index bfecc9d..d121442 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.transforms.AppliedPTransform;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
index 95095fa..fb8eb7c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java
index 52bc575..0c7449c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkState;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java
index f374f99..bd07040 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
@@ -27,7 +27,7 @@ import java.util.Map;
 
 /**
  * An {@link OutputManager} that outputs to {@link CommittedBundle Bundles} used by the
- * {@link InProcessPipelineRunner}.
+ * {@link DirectRunner}.
  */
 public class InProcessBundleOutputManager implements OutputManager {
   private final Map<TupleTag<?>, UncommittedBundle<?>> bundles;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
index db8baa0..220ff83 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
@@ -22,9 +22,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers;
 import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks;
 import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -58,11 +58,11 @@ import javax.annotation.Nullable;
 
 /**
  * The evaluation context for a specific pipeline being executed by the
- * {@link InProcessPipelineRunner}. Contains state shared within the execution across all
+ * {@link DirectRunner}. Contains state shared within the execution across all
  * transforms.
  *
  * <p>{@link InProcessEvaluationContext} contains shared state for an execution of the
- * {@link InProcessPipelineRunner} that can be used while evaluating a {@link PTransform}. This
+ * {@link DirectRunner} that can be used while evaluating a {@link PTransform}. This
  * consists of views into underlying state and watermark implementations, access to read and write
  * {@link PCollectionView PCollectionViews}, and constructing {@link CounterSet CounterSets} and
  * {@link ExecutionContext ExecutionContexts}. This includes executing callbacks asynchronously when
@@ -79,7 +79,7 @@ class InProcessEvaluationContext {
   private final Map<AppliedPTransform<?, ?, ?>, String> stepNames;
 
   /** The options that were used to create this {@link Pipeline}. */
-  private final InProcessPipelineOptions options;
+  private final DirectOptions options;
 
   private final BundleFactory bundleFactory;
   /** The current processing time and event time watermarks and timers. */
@@ -97,7 +97,7 @@ class InProcessEvaluationContext {
   private final CounterSet mergedCounters;
 
   public static InProcessEvaluationContext create(
-      InProcessPipelineOptions options,
+      DirectOptions options,
       BundleFactory bundleFactory,
       Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
       Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
@@ -108,7 +108,7 @@ class InProcessEvaluationContext {
   }
 
   private InProcessEvaluationContext(
-      InProcessPipelineOptions options,
+      DirectOptions options,
       BundleFactory bundleFactory,
       Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
       Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
@@ -295,7 +295,7 @@ class InProcessEvaluationContext {
   /**
    * Get the options used by this {@link Pipeline}.
    */
-  public InProcessPipelineOptions getPipelineOptions() {
+  public DirectOptions getPipelineOptions() {
     return options;
   }
 
@@ -389,7 +389,7 @@ class InProcessEvaluationContext {
    *
    * <p>If the provided transform produces any {@link IsBounded#UNBOUNDED}
    * {@link PCollection PCollections}, returns the value of
-   * {@link InProcessPipelineOptions#isShutdownUnboundedProducersWithMaxWatermark()}.
+   * {@link DirectOptions#isShutdownUnboundedProducersWithMaxWatermark()}.
    */
   public boolean isDone(AppliedPTransform<?, ?, ?> transform) {
     // if the PTransform's watermark isn't at the max value, it isn't done

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
index 4f10b3a..d2558ce 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
@@ -25,7 +25,7 @@ import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
 
 /**
- * Execution Context for the {@link InProcessPipelineRunner}.
+ * Execution Context for the {@link DirectRunner}.
  *
  * This implementation is not thread safe. A new {@link InProcessExecutionContext} must be created
  * for each thread that requires it.
@@ -51,7 +51,7 @@ class InProcessExecutionContext
   }
 
   /**
-   * Step Context for the {@link InProcessPipelineRunner}.
+   * Step Context for the {@link DirectRunner}.
    */
   public class InProcessStepContext
       extends org.apache.beam.sdk.util.BaseExecutionContext.StepContext {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java
index d811e1b..1cfa544 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java
index 5ded8b6..53b93d0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java
@@ -19,7 +19,7 @@ package org.apache.beam.runners.direct;
 
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
 import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupAlsoByWindow;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -41,7 +41,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.Collections;
 
 /**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
  * {@link GroupByKeyOnly} {@link PTransform}.
  */
 class InProcessGroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
index a10d496..3604bca 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
@@ -22,8 +22,8 @@ import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
 import static com.google.common.base.Preconditions.checkState;
 
 import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.runners.direct.StepTransformResult.Builder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -45,7 +45,7 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
  * {@link GroupByKeyOnly} {@link PTransform}.
  */
 class InProcessGroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java
deleted file mode 100644
index 0498521..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.ApplicationNameOptions;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.Hidden;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.Validation.Required;
-import org.apache.beam.sdk.transforms.PTransform;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * Options that can be used to configure the {@link InProcessPipelineRunner}.
- */
-public interface InProcessPipelineOptions extends PipelineOptions, ApplicationNameOptions {
-  /**
-   * Gets the {@link ExecutorServiceFactory} to use to create instances of {@link ExecutorService}
-   * to execute {@link PTransform PTransforms}.
-   *
-   * <p>Note that {@link ExecutorService ExecutorServices} returned by the factory must ensure that
-   * it cannot enter a state in which it will not schedule additional pending work unless currently
-   * scheduled work completes, as this may cause the {@link Pipeline} to cease processing.
-   *
-   * <p>Defaults to a {@link FixedThreadPoolExecutorServiceFactory}, which produces instances of
-   * {@link Executors#newCachedThreadPool()}.
-   */
-  @JsonIgnore
-  @Required
-  @Hidden
-  @Default.InstanceFactory(FixedThreadPoolExecutorServiceFactory.class)
-  ExecutorServiceFactory getExecutorServiceFactory();
-
-  void setExecutorServiceFactory(ExecutorServiceFactory executorService);
-
-  /**
-   * Gets the {@link Clock} used by this pipeline. The clock is used in place of accessing the
-   * system time when time values are required by the evaluator.
-   */
-  @Default.InstanceFactory(NanosOffsetClock.Factory.class)
-  @JsonIgnore
-  @Required
-  @Hidden
-  @Description(
-      "The processing time source used by the pipeline. When the current time is "
-          + "needed by the evaluator, the result of clock#now() is used.")
-  Clock getClock();
-
-  void setClock(Clock clock);
-
-  @Default.Boolean(false)
-  @Description(
-      "If the pipeline should shut down producers which have reached the maximum "
-          + "representable watermark. If this is set to true, a pipeline in which all PTransforms "
-          + "have reached the maximum watermark will be shut down, even if there are unbounded "
-          + "sources that could produce additional (late) data. By default, if the pipeline "
-          + "contains any unbounded PCollections, it will run until explicitly shut down.")
-  boolean isShutdownUnboundedProducersWithMaxWatermark();
-
-  void setShutdownUnboundedProducersWithMaxWatermark(boolean shutdown);
-
-  @Default.Boolean(true)
-  @Description(
-      "If the pipeline should block awaiting completion of the pipeline. If set to true, "
-          + "a call to Pipeline#run() will block until all PTransforms are complete. Otherwise, "
-          + "the Pipeline will execute asynchronously. If set to false, the completion of the "
-          + "pipeline can be awaited on by use of InProcessPipelineResult#awaitCompletion().")
-  boolean isBlockOnRun();
-
-  void setBlockOnRun(boolean b);
-
-  @Default.Boolean(true)
-  @Description(
-      "Controls whether the runner should ensure that all of the elements of every "
-          + "PCollection are not mutated. PTransforms are not permitted to mutate input elements "
-          + "at any point, or output elements after they are output.")
-  boolean isTestImmutability();
-
-  void setTestImmutability(boolean test);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
deleted file mode 100644
index 8847c58..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
-import org.apache.beam.runners.direct.ViewEvaluatorFactory.InProcessViewOverrideFactory;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.AggregatorPipelineExtractor;
-import org.apache.beam.sdk.runners.AggregatorRetrievalException;
-import org.apache.beam.sdk.runners.AggregatorValues;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.util.MapAggregatorValues;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-import org.joda.time.Instant;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-
-/**
- * An In-Memory implementation of the Dataflow Programming Model. Supports Unbounded
- * {@link PCollection PCollections}.
- */
-@Experimental
-public class InProcessPipelineRunner
-    extends PipelineRunner<InProcessPipelineRunner.InProcessPipelineResult> {
-  /**
-   * The default set of transform overrides to use in the {@link InProcessPipelineRunner}.
-   *
-   * <p>A transform override must have a single-argument constructor that takes an instance of the
-   * type of transform it is overriding.
-   */
-  @SuppressWarnings("rawtypes")
-  private static Map<Class<? extends PTransform>, PTransformOverrideFactory>
-      defaultTransformOverrides =
-          ImmutableMap.<Class<? extends PTransform>, PTransformOverrideFactory>builder()
-              .put(GroupByKey.class, new InProcessGroupByKeyOverrideFactory())
-              .put(CreatePCollectionView.class, new InProcessViewOverrideFactory())
-              .put(AvroIO.Write.Bound.class, new AvroIOShardedWriteFactory())
-              .put(TextIO.Write.Bound.class, new TextIOShardedWriteFactory())
-              .build();
-
-  /**
-   * Part of a {@link PCollection}. Elements are output to a bundle, which will cause them to be
-   * executed by {@link PTransform PTransforms} that consume the {@link PCollection} this bundle is
-   * a part of at a later point. This is an uncommitted bundle and can have elements added to it.
-   *
-   * @param <T> the type of elements that can be added to this bundle
-   */
-  public static interface UncommittedBundle<T> {
-    /**
-     * Returns the PCollection that the elements of this {@link UncommittedBundle} belong to.
-     */
-    PCollection<T> getPCollection();
-
-    /**
-     * Outputs an element to this bundle.
-     *
-     * @param element the element to add to this bundle
-     * @return this bundle
-     */
-    UncommittedBundle<T> add(WindowedValue<T> element);
-
-    /**
-     * Commits this {@link UncommittedBundle}, returning an immutable {@link CommittedBundle}
-     * containing all of the elements that were added to it. The {@link #add(WindowedValue)} method
-     * will throw an {@link IllegalStateException} if called after a call to commit.
-     * @param synchronizedProcessingTime the synchronized processing time at which this bundle was
-     *                                   committed
-     */
-    CommittedBundle<T> commit(Instant synchronizedProcessingTime);
-  }
-
-  /**
-   * Part of a {@link PCollection}. Elements are output to an {@link UncommittedBundle}, which will
-   * eventually committed. Committed elements are executed by the {@link PTransform PTransforms}
-   * that consume the {@link PCollection} this bundle is
-   * a part of at a later point.
-   * @param <T> the type of elements contained within this bundle
-   */
-  public static interface CommittedBundle<T> {
-    /**
-     * Returns the PCollection that the elements of this bundle belong to.
-     */
-    PCollection<T> getPCollection();
-
-    /**
-     * Returns the key that was output in the most recent {@link GroupByKey} in the
-     * execution of this bundle.
-     */
-    StructuralKey<?> getKey();
-
-    /**
-     * Returns an {@link Iterable} containing all of the elements that have been added to this
-     * {@link CommittedBundle}.
-     */
-    Iterable<WindowedValue<T>> getElements();
-
-    /**
-     * Returns the processing time output watermark at the time the producing {@link PTransform}
-     * committed this bundle. Downstream synchronized processing time watermarks cannot progress
-     * past this point before consuming this bundle.
-     *
-     * <p>This value is no greater than the earliest incomplete processing time or synchronized
-     * processing time {@link TimerData timer} at the time this bundle was committed, including any
-     * timers that fired to produce this bundle.
-     */
-    Instant getSynchronizedProcessingOutputWatermark();
-
-    /**
-     * Return a new {@link CommittedBundle} that is like this one, except calls to
-     * {@link #getElements()} will return the provided elements. This bundle is unchanged.
-     *
-     * <p>
-     * The value of the {@link #getSynchronizedProcessingOutputWatermark() synchronized processing
-     * output watermark} of the returned {@link CommittedBundle} is equal to the value returned from
-     * the current bundle. This is used to ensure a {@link PTransform} that could not complete
-     * processing on input elements properly holds the synchronized processing time to the
-     * appropriate value.
-     */
-    CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements);
-  }
-
-  /**
-   * A {@link PCollectionViewWriter} is responsible for writing contents of a {@link PCollection} to
-   * a storage mechanism that can be read from while constructing a {@link PCollectionView}.
-   * @param <ElemT> the type of elements the input {@link PCollection} contains.
-   * @param <ViewT> the type of the PCollectionView this writer writes to.
-   */
-  public static interface PCollectionViewWriter<ElemT, ViewT> {
-    void add(Iterable<WindowedValue<ElemT>> values);
-  }
-
-  ////////////////////////////////////////////////////////////////////////////////////////////////
-  private final InProcessPipelineOptions options;
-
-  public static InProcessPipelineRunner fromOptions(PipelineOptions options) {
-    return new InProcessPipelineRunner(options.as(InProcessPipelineOptions.class));
-  }
-
-  private InProcessPipelineRunner(InProcessPipelineOptions options) {
-    this.options = options;
-  }
-
-  /**
-   * Returns the {@link PipelineOptions} used to create this {@link InProcessPipelineRunner}.
-   */
-  public InProcessPipelineOptions getPipelineOptions() {
-    return options;
-  }
-
-  @Override
-  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
-      PTransform<InputT, OutputT> transform, InputT input) {
-    PTransformOverrideFactory overrideFactory = defaultTransformOverrides.get(transform.getClass());
-    if (overrideFactory != null) {
-      PTransform<InputT, OutputT> customTransform = overrideFactory.override(transform);
-
-      return super.apply(customTransform, input);
-    }
-    // If there is no override, or we should not apply the override, apply the original transform
-    return super.apply(transform, input);
-  }
-
-  @Override
-  public InProcessPipelineResult run(Pipeline pipeline) {
-    ConsumerTrackingPipelineVisitor consumerTrackingVisitor = new ConsumerTrackingPipelineVisitor();
-    pipeline.traverseTopologically(consumerTrackingVisitor);
-    for (PValue unfinalized : consumerTrackingVisitor.getUnfinalizedPValues()) {
-      unfinalized.finishSpecifying();
-    }
-    @SuppressWarnings("rawtypes")
-    KeyedPValueTrackingVisitor keyedPValueVisitor =
-        KeyedPValueTrackingVisitor.create(
-            ImmutableSet.<Class<? extends PTransform>>of(
-                GroupByKey.class, InProcessGroupByKeyOnly.class));
-    pipeline.traverseTopologically(keyedPValueVisitor);
-
-    DisplayDataValidator.validatePipeline(pipeline);
-
-    InProcessEvaluationContext context =
-        InProcessEvaluationContext.create(
-            getPipelineOptions(),
-            createBundleFactory(getPipelineOptions()),
-            consumerTrackingVisitor.getRootTransforms(),
-            consumerTrackingVisitor.getValueToConsumers(),
-            consumerTrackingVisitor.getStepNames(),
-            consumerTrackingVisitor.getViews());
-
-    // independent executor service for each run
-    ExecutorService executorService =
-        context.getPipelineOptions().getExecutorServiceFactory().create();
-    InProcessExecutor executor =
-        ExecutorServiceParallelExecutor.create(
-            executorService,
-            consumerTrackingVisitor.getValueToConsumers(),
-            keyedPValueVisitor.getKeyedPValues(),
-            TransformEvaluatorRegistry.defaultRegistry(),
-            defaultModelEnforcements(options),
-            context);
-    executor.start(consumerTrackingVisitor.getRootTransforms());
-
-    Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
-        new AggregatorPipelineExtractor(pipeline).getAggregatorSteps();
-    InProcessPipelineResult result =
-        new InProcessPipelineResult(executor, context, aggregatorSteps);
-    if (options.isBlockOnRun()) {
-      try {
-        result.awaitCompletion();
-      } catch (UserCodeException userException) {
-        throw new PipelineExecutionException(userException.getCause());
-      } catch (Throwable t) {
-        if (t instanceof RuntimeException) {
-          throw (RuntimeException) t;
-        }
-        throw new RuntimeException(t);
-      }
-    }
-    return result;
-  }
-
-  private Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
-      defaultModelEnforcements(InProcessPipelineOptions options) {
-    ImmutableMap.Builder<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
-        enforcements = ImmutableMap.builder();
-    Collection<ModelEnforcementFactory> parDoEnforcements = createParDoEnforcements(options);
-    enforcements.put(ParDo.Bound.class, parDoEnforcements);
-    enforcements.put(ParDo.BoundMulti.class, parDoEnforcements);
-    return enforcements.build();
-  }
-
-  private Collection<ModelEnforcementFactory> createParDoEnforcements(
-      InProcessPipelineOptions options) {
-    ImmutableList.Builder<ModelEnforcementFactory> enforcements = ImmutableList.builder();
-    if (options.isTestImmutability()) {
-      enforcements.add(ImmutabilityEnforcementFactory.create());
-    }
-    return enforcements.build();
-  }
-
-  private BundleFactory createBundleFactory(InProcessPipelineOptions pipelineOptions) {
-    BundleFactory bundleFactory = InProcessBundleFactory.create();
-    if (pipelineOptions.isTestImmutability()) {
-      bundleFactory = ImmutabilityCheckingBundleFactory.create(bundleFactory);
-    }
-    return bundleFactory;
-  }
-
-  /**
-   * The result of running a {@link Pipeline} with the {@link InProcessPipelineRunner}.
-   *
-   * Throws {@link UnsupportedOperationException} for all methods.
-   */
-  public static class InProcessPipelineResult implements PipelineResult {
-    private final InProcessExecutor executor;
-    private final InProcessEvaluationContext evaluationContext;
-    private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps;
-    private State state;
-
-    private InProcessPipelineResult(
-        InProcessExecutor executor,
-        InProcessEvaluationContext evaluationContext,
-        Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps) {
-      this.executor = executor;
-      this.evaluationContext = evaluationContext;
-      this.aggregatorSteps = aggregatorSteps;
-      // Only ever constructed after the executor has started.
-      this.state = State.RUNNING;
-    }
-
-    @Override
-    public State getState() {
-      return state;
-    }
-
-    @Override
-    public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
-        throws AggregatorRetrievalException {
-      CounterSet counters = evaluationContext.getCounters();
-      Collection<PTransform<?, ?>> steps = aggregatorSteps.get(aggregator);
-      Map<String, T> stepValues = new HashMap<>();
-      for (AppliedPTransform<?, ?, ?> transform : evaluationContext.getSteps()) {
-        if (steps.contains(transform.getTransform())) {
-          String stepName =
-              String.format(
-                  "user-%s-%s", evaluationContext.getStepName(transform), aggregator.getName());
-          Counter<T> counter = (Counter<T>) counters.getExistingCounter(stepName);
-          if (counter != null) {
-            stepValues.put(transform.getFullName(), counter.getAggregate());
-          }
-        }
-      }
-      return new MapAggregatorValues<>(stepValues);
-    }
-
-    /**
-     * Blocks until the {@link Pipeline} execution represented by this
-     * {@link InProcessPipelineResult} is complete, returning the terminal state.
-     *
-     * <p>If the pipeline terminates abnormally by throwing an exception, this will rethrow the
-     * exception. Future calls to {@link #getState()} will return
-     * {@link org.apache.beam.sdk.PipelineResult.State#FAILED}.
-     *
-     * <p>NOTE: if the {@link Pipeline} contains an {@link IsBounded#UNBOUNDED unbounded}
-     * {@link PCollection}, and the {@link PipelineRunner} was created with
-     * {@link InProcessPipelineOptions#isShutdownUnboundedProducersWithMaxWatermark()} set to false,
-     * this method will never return.
-     *
-     * See also {@link InProcessExecutor#awaitCompletion()}.
-     */
-    public State awaitCompletion() throws Throwable {
-      if (!state.isTerminal()) {
-        try {
-          executor.awaitCompletion();
-          state = State.DONE;
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw e;
-        } catch (Throwable t) {
-          state = State.FAILED;
-          throw t;
-        }
-      }
-      return state;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessRegistrar.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessRegistrar.java
deleted file mode 100644
index 4a09de7..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessRegistrar.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
-
-/**
- * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the
- * {@link InProcessPipelineRunner}.
- */
-public class InProcessRegistrar {
-  private InProcessRegistrar() {}
-  /**
-   * Registers the {@link InProcessPipelineRunner}.
-   */
-  @AutoService(PipelineRunnerRegistrar.class)
-  public static class InProcessRunner implements PipelineRunnerRegistrar {
-    @Override
-    public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
-      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(InProcessPipelineRunner.class);
-    }
-  }
-
-  /**
-   * Registers the {@link InProcessPipelineOptions}.
-   */
-  @AutoService(PipelineOptionsRegistrar.class)
-  public static class InProcessOptions implements PipelineOptionsRegistrar {
-    @Override
-    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
-      return ImmutableList.<Class<? extends PipelineOptions>>of(InProcessPipelineOptions.class);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
index 0bc3ea1..92127b4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.direct;
 
 import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
index cc9b6da..758ee24 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java
index 6162ba0..e0bbfcb 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
index a3e2f18..b9f4808 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
@@ -18,8 +18,8 @@
 package org.apache.beam.runners.direct;
 
 import org.apache.beam.runners.direct.InProcessExecutionContext.InProcessStepContext;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.util.DoFnRunner;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
index 8945242..58d6f00 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -32,7 +32,7 @@ import com.google.common.cache.LoadingCache;
 import java.util.Map;
 
 /**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
  * {@link BoundMulti} primitive {@link PTransform}.
  */
 class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
index 15704d7..afbb6ed 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -32,7 +32,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.Collections;
 
 /**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
  * {@link Bound ParDo.Bound} primitive {@link PTransform}.
  */
 class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
index aef62b2..ba792d3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2bafda1d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index b2e3897..eacea91 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.direct;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;