You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/15 22:21:07 UTC

[2/4] incubator-beam git commit: Remove the DirectPipelineRunner from the Core SDK

Remove the DirectPipelineRunner from the Core SDK


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bf476e12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bf476e12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bf476e12

Branch: refs/heads/master
Commit: bf476e12beedd4598c388fae02b662b3d4794aaa
Parents: b31be38
Author: Thomas Groh <tg...@google.com>
Authored: Tue Jun 14 17:52:49 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Jun 15 10:10:05 2016 -0700

----------------------------------------------------------------------
 .../examples/common/DataflowExampleUtils.java   |   11 +-
 runners/spark/pom.xml                           |    6 +
 .../translation/TransformTranslatorTest.java    |    4 +-
 .../java/org/apache/beam/sdk/io/PubsubIO.java   |    1 -
 .../main/java/org/apache/beam/sdk/io/Read.java  |   44 -
 .../java/org/apache/beam/sdk/io/TextIO.java     |    1 -
 .../beam/sdk/options/DirectPipelineOptions.java |    1 -
 .../sdk/runners/DirectPipelineRegistrar.java    |   55 -
 .../beam/sdk/runners/DirectPipelineRunner.java  | 1298 ------------------
 .../org/apache/beam/sdk/transforms/Flatten.java |   32 -
 .../org/apache/beam/sdk/transforms/ParDo.java   |  302 +---
 .../org/apache/beam/sdk/transforms/View.java    |   24 -
 .../sdk/util/DirectModeExecutionContext.java    |  130 --
 .../apache/beam/sdk/util/DoFnRunnerBase.java    |    1 -
 .../java/org/apache/beam/sdk/PipelineTest.java  |    4 +-
 .../io/BoundedReadFromUnboundedSourceTest.java  |    6 -
 .../runners/DirectPipelineRegistrarTest.java    |   71 -
 .../sdk/runners/DirectPipelineRunnerTest.java   |  222 ---
 .../beam/sdk/runners/PipelineRunnerTest.java    |    9 +-
 .../apache/beam/sdk/transforms/CombineTest.java |   21 -
 .../beam/sdk/transforms/GroupByKeyTest.java     |   13 +-
 .../apache/beam/sdk/transforms/ViewTest.java    |   29 +-
 .../main/java/common/DataflowExampleUtils.java  |   13 +-
 testing/travis/test_wordcount.sh                |    4 +-
 24 files changed, 40 insertions(+), 2262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
index fb4f3bf..a0b7319 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.examples.common;
 
+import org.apache.beam.runners.dataflow.BlockingDataflowPipelineRunner;
 import org.apache.beam.runners.dataflow.DataflowPipelineJob;
 import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
@@ -25,7 +26,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.BigQueryOptions;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.IntraBundleParallelization;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
@@ -315,11 +316,13 @@ public class DataflowExampleUtils {
 
   /**
    * Do some runner setup: check that the DirectPipelineRunner is not used in conjunction with
-   * streaming, and if streaming is specified, use the DataflowPipelineRunner. Return the streaming
-   * flag value.
+   * streaming, and if streaming is specified, use the DataflowPipelineRunner.
    */
   public void setupRunner() {
-    if (options.isStreaming() && options.getRunner() != DirectPipelineRunner.class) {
+    Class<? extends PipelineRunner<?>> runner = options.getRunner();
+    if (options.isStreaming()
+        && (runner.equals(DataflowPipelineRunner.class)
+            || runner.equals(BlockingDataflowPipelineRunner.class))) {
       // In order to cancel the pipelines automatically,
       // {@literal DataflowPipelineRunner} is forced to be used.
       options.setRunner(DataflowPipelineRunner.class);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 4110689..e7d0834 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -118,6 +118,12 @@
       <artifactId>hamcrest-all</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <version>0.2.0-incubating-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
index 4ef26d3..01f3070 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
@@ -21,12 +21,12 @@ package org.apache.beam.runners.spark.translation;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 
+import org.apache.beam.runners.direct.InProcessPipelineRunner;
 import org.apache.beam.runners.spark.SparkPipelineRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -58,7 +58,7 @@ public class TransformTranslatorTest {
    */
   @Test
   public void testTextIOReadAndWriteTransforms() throws IOException {
-    String directOut = runPipeline(DirectPipelineRunner.class);
+    String directOut = runPipeline(InProcessPipelineRunner.class);
     String sparkOut = runPipeline(SparkPipelineRunner.class);
 
     List<String> directOutput =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 7e24253..2a5698c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -23,7 +23,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.options.PubsubOptions;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index fb40063..c0440f2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -20,11 +20,9 @@ package org.apache.beam.sdk.io;
 import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName;
 
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -32,9 +30,6 @@ import org.apache.beam.sdk.values.PInput;
 
 import org.joda.time.Duration;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import javax.annotation.Nullable;
 
 /**
@@ -153,45 +148,6 @@ public class Read {
             .withLabel("Read Source"))
           .include(source);
     }
-
-    static {
-      registerDefaultTransformEvaluator();
-    }
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    private static void registerDefaultTransformEvaluator() {
-      DirectPipelineRunner.registerDefaultTransformEvaluator(
-          Bounded.class,
-          new DirectPipelineRunner.TransformEvaluator<Bounded>() {
-            @Override
-            public void evaluate(
-                Bounded transform, DirectPipelineRunner.EvaluationContext context) {
-              evaluateReadHelper(transform, context);
-            }
-
-            private <T> void evaluateReadHelper(
-                Read.Bounded<T> transform, DirectPipelineRunner.EvaluationContext context) {
-              try {
-                List<DirectPipelineRunner.ValueWithMetadata<T>> output = new ArrayList<>();
-                BoundedSource<T> source = transform.getSource();
-                try (BoundedSource.BoundedReader<T> reader =
-                    source.createReader(context.getPipelineOptions())) {
-                  for (boolean available = reader.start();
-                      available;
-                      available = reader.advance()) {
-                    output.add(
-                        DirectPipelineRunner.ValueWithMetadata.of(
-                            WindowedValue.timestampedValueInGlobalWindow(
-                                reader.getCurrent(), reader.getCurrentTimestamp())));
-                  }
-                }
-                context.setPCollectionValuesWithMetadata(context.getOutput(transform), output);
-              } catch (Exception e) {
-                throw new RuntimeException(e);
-              }
-            }
-          });
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 13cb45e..bbef072 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -25,7 +25,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.IOChannelUtils;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java
index 4cdc0ca..c2095e3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.options;
 
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.values.PCollection;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRegistrar.java
deleted file mode 100644
index 7dd0fdd..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRegistrar.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners;
-
-import org.apache.beam.sdk.options.DirectPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
-
-/**
- * Contains the {@link PipelineOptionsRegistrar} and {@link PipelineRunnerRegistrar} for
- * the {@link DirectPipeline}.
- */
-public class DirectPipelineRegistrar {
-  private DirectPipelineRegistrar() { }
-
-  /**
-   * Register the {@link DirectPipelineRunner}.
-   */
-  @AutoService(PipelineRunnerRegistrar.class)
-  public static class Runner implements PipelineRunnerRegistrar {
-    @Override
-    public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
-      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(DirectPipelineRunner.class);
-    }
-  }
-
-  /**
-   * Register the {@link DirectPipelineOptions}.
-   */
-  @AutoService(PipelineOptionsRegistrar.class)
-  public static class Options implements PipelineOptionsRegistrar {
-    @Override
-    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
-      return ImmutableList.<Class<? extends PipelineOptions>>of(DirectPipelineOptions.class);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java
deleted file mode 100644
index 1eb25c5..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java
+++ /dev/null
@@ -1,1298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners;
-
-import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.io.FileBasedSink;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.DirectPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Partition;
-import org.apache.beam.sdk.transforms.Partition.PartitionFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.AppliedCombineFn;
-import org.apache.beam.sdk.util.AssignWindows;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
-import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.util.MapAggregatorValues;
-import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
-import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TypedPValue;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-/**
- * Executes the operations in the pipeline directly, in this process, without
- * any optimization.  Useful for small local execution and tests.
- *
- * <p>Throws an exception from {@link #run} if execution fails.
- *
- * <p><h3>Permissions</h3>
- * When reading from a Dataflow source or writing to a Dataflow sink using
- * {@code DirectPipelineRunner}, the Cloud Platform account that you configured with the
- * <a href="https://cloud.google.com/sdk/gcloud">gcloud</a> executable will need access to the
- * corresponding source/sink.
- *
- * <p>Please see <a href="https://cloud.google.com/dataflow/security-and-permissions">Google Cloud
- * Dataflow Security and Permissions</a> for more details.
- */
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class DirectPipelineRunner
-    extends PipelineRunner<DirectPipelineRunner.EvaluationResults> {
-  private static final Logger LOG = LoggerFactory.getLogger(DirectPipelineRunner.class);
-
-  /**
-   * A source of random data, which can be seeded if determinism is desired.
-   */
-  private Random rand;
-
-  /**
-   * A map from PTransform class to the corresponding
-   * TransformEvaluator to use to evaluate that transform.
-   *
-   * <p>A static map that contains system-wide defaults.
-   */
-  private static Map<Class, TransformEvaluator> defaultTransformEvaluators =
-      new HashMap<>();
-
-  /**
-   * A map from PTransform class to the corresponding
-   * TransformEvaluator to use to evaluate that transform.
-   *
-   * <p>An instance map that contains bindings for this DirectPipelineRunner.
-   * Bindings in this map override those in the default map.
-   */
-  private Map<Class, TransformEvaluator> localTransformEvaluators =
-      new HashMap<>();
-
-  /**
-   * Records that instances of the specified PTransform class
-   * should be evaluated by default by the corresponding
-   * TransformEvaluator.
-   */
-  public static <TransformT extends PTransform<?, ?>>
-  void registerDefaultTransformEvaluator(
-      Class<TransformT> transformClass,
-      TransformEvaluator<? super TransformT> transformEvaluator) {
-    if (defaultTransformEvaluators.put(transformClass, transformEvaluator)
-        != null) {
-      throw new IllegalArgumentException(
-          "defining multiple evaluators for " + transformClass);
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Records that instances of the specified PTransform class
-   * should be evaluated by the corresponding TransformEvaluator.
-   * Overrides any bindings specified by
-   * {@link #registerDefaultTransformEvaluator}.
-   */
-  public <TransformT extends PTransform<?, ?>>
-  void registerTransformEvaluator(
-      Class<TransformT> transformClass,
-      TransformEvaluator<TransformT> transformEvaluator) {
-    if (localTransformEvaluators.put(transformClass, transformEvaluator)
-        != null) {
-      throw new IllegalArgumentException(
-          "defining multiple evaluators for " + transformClass);
-    }
-  }
-
-  /**
-   * Returns the TransformEvaluator to use for instances of the
-   * specified PTransform class, or null if none registered.
-   */
-  public <TransformT extends PTransform<?, ?>>
-      TransformEvaluator<TransformT> getTransformEvaluator(Class<TransformT> transformClass) {
-    TransformEvaluator<TransformT> transformEvaluator =
-        localTransformEvaluators.get(transformClass);
-    if (transformEvaluator == null) {
-      transformEvaluator = defaultTransformEvaluators.get(transformClass);
-    }
-    return transformEvaluator;
-  }
-
-  /**
-   * Constructs a DirectPipelineRunner from the given options.
-   */
-  public static DirectPipelineRunner fromOptions(PipelineOptions options) {
-    DirectPipelineOptions directOptions =
-        PipelineOptionsValidator.validate(DirectPipelineOptions.class, options);
-    LOG.debug("Creating DirectPipelineRunner");
-    return new DirectPipelineRunner(directOptions);
-  }
-
-  /**
-   * Enable runtime testing to verify that all functions and {@link Coder}
-   * instances can be serialized.
-   *
-   * <p>Enabled by default.
-   *
-   * <p>This method modifies the {@code DirectPipelineRunner} instance and
-   * returns itself.
-   */
-  public DirectPipelineRunner withSerializabilityTesting(boolean enable) {
-    this.testSerializability = enable;
-    return this;
-  }
-
-  /**
-   * Enable runtime testing to verify that all values can be encoded.
-   *
-   * <p>Enabled by default.
-   *
-   * <p>This method modifies the {@code DirectPipelineRunner} instance and
-   * returns itself.
-   */
-  public DirectPipelineRunner withEncodabilityTesting(boolean enable) {
-    this.testEncodability = enable;
-    return this;
-  }
-
-  /**
-   * Enable runtime testing to verify that functions do not depend on order
-   * of the elements.
-   *
-   * <p>This is accomplished by randomizing the order of elements.
-   *
-   * <p>Enabled by default.
-   *
-   * <p>This method modifies the {@code DirectPipelineRunner} instance and
-   * returns itself.
-   */
-  public DirectPipelineRunner withUnorderednessTesting(boolean enable) {
-    this.testUnorderedness = enable;
-    return this;
-  }
-
-  @Override
-  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
-      PTransform<InputT, OutputT> transform, InputT input) {
-    if (transform instanceof Combine.GroupedValues) {
-      return (OutputT) applyTestCombine((Combine.GroupedValues) transform, (PCollection) input);
-    } else if (transform instanceof TextIO.Write.Bound) {
-      return (OutputT) applyTextIOWrite((TextIO.Write.Bound) transform, (PCollection<?>) input);
-    } else if (transform instanceof AvroIO.Write.Bound) {
-      return (OutputT) applyAvroIOWrite((AvroIO.Write.Bound) transform, (PCollection<?>) input);
-    } else if (transform instanceof GroupByKey) {
-      return (OutputT)
-          ((PCollection) input).apply(new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
-    } else if (transform instanceof Window.Bound) {
-      return (OutputT)
-          ((PCollection) input).apply(new AssignWindowsAndSetStrategy((Window.Bound) transform));
-    } else {
-      return super.apply(transform, input);
-    }
-  }
-
-  private <K, InputT, AccumT, OutputT> PCollection<KV<K, OutputT>> applyTestCombine(
-      Combine.GroupedValues<K, InputT, OutputT> transform,
-      PCollection<KV<K, Iterable<InputT>>> input) {
-
-    PCollection<KV<K, OutputT>> output = input
-        .apply(ParDo.of(TestCombineDoFn.create(transform, input, testSerializability, rand))
-            .withSideInputs(transform.getSideInputs()));
-
-    try {
-      output.setCoder(transform.getDefaultOutputCoder(input));
-    } catch (CannotProvideCoderException exc) {
-      // let coder inference occur later, if it can
-    }
-    return output;
-  }
-
-  private static class ElementProcessingOrderPartitionFn<T> implements PartitionFn<T> {
-    private int elementNumber;
-    @Override
-    public int partitionFor(T elem, int numPartitions) {
-      return elementNumber++ % numPartitions;
-    }
-  }
-
-  /**
-   * Applies TextIO.Write honoring user requested sharding controls (i.e. withNumShards)
-   * by applying a partition function based upon the number of shards the user requested.
-   */
-  private static class DirectTextIOWrite<T> extends PTransform<PCollection<T>, PDone> {
-    private final TextIO.Write.Bound<T> transform;
-
-    private DirectTextIOWrite(TextIO.Write.Bound<T> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PDone apply(PCollection<T> input) {
-      checkState(transform.getNumShards() > 1,
-          "DirectTextIOWrite is expected to only be used when sharding controls are required.");
-
-      // Evenly distribute all the elements across the partitions.
-      PCollectionList<T> partitionedElements =
-          input.apply(Partition.of(transform.getNumShards(),
-                                   new ElementProcessingOrderPartitionFn<T>()));
-
-      // For each input PCollection partition, create a write transform that represents
-      // one of the specific shards.
-      for (int i = 0; i < transform.getNumShards(); ++i) {
-        /*
-         * This logic mirrors the file naming strategy within
-         * {@link FileBasedSink#generateDestinationFilenames()}
-         */
-        String outputFilename = IOChannelUtils.constructName(
-            transform.getFilenamePrefix(),
-            transform.getShardNameTemplate(),
-            getFileExtension(transform.getFilenameSuffix()),
-            i,
-            transform.getNumShards());
-
-        String transformName = String.format("%s(Shard:%s)", transform.getName(), i);
-        partitionedElements.get(i).apply(transformName,
-            transform.withNumShards(1).withShardNameTemplate("").withSuffix("").to(outputFilename));
-      }
-      return PDone.in(input.getPipeline());
-    }
-  }
-
-  /**
-   * Returns the file extension to be used. If the user did not request a file
-   * extension then this method returns the empty string. Otherwise this method
-   * adds a {@code "."} to the beginning of the users extension if one is not present.
-   *
-   * <p>This is copied from {@link FileBasedSink} to not expose it.
-   */
-  private static String getFileExtension(String usersExtension) {
-    if (usersExtension == null || usersExtension.isEmpty()) {
-      return "";
-    }
-    if (usersExtension.startsWith(".")) {
-      return usersExtension;
-    }
-    return "." + usersExtension;
-  }
-
-  /**
-   * Apply the override for TextIO.Write.Bound if the user requested sharding controls
-   * greater than one.
-   */
-  private <T> PDone applyTextIOWrite(TextIO.Write.Bound<T> transform, PCollection<T> input) {
-    if (transform.getNumShards() <= 1) {
-      // By default, the DirectPipelineRunner outputs to only 1 shard. Since the user never
-      // requested sharding controls greater than 1, we default to outputting to 1 file.
-      return super.apply(transform.withNumShards(1), input);
-    }
-    return input.apply(new DirectTextIOWrite<>(transform));
-  }
-
-  /**
-   * Applies AvroIO.Write honoring user requested sharding controls (i.e. withNumShards)
-   * by applying a partition function based upon the number of shards the user requested.
-   */
-  private static class DirectAvroIOWrite<T> extends PTransform<PCollection<T>, PDone> {
-    private final AvroIO.Write.Bound<T> transform;
-
-    private DirectAvroIOWrite(AvroIO.Write.Bound<T> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PDone apply(PCollection<T> input) {
-      checkState(transform.getNumShards() > 1,
-          "DirectAvroIOWrite is expected to only be used when sharding controls are required.");
-
-      // Evenly distribute all the elements across the partitions.
-      PCollectionList<T> partitionedElements =
-          input.apply(Partition.of(transform.getNumShards(),
-                                   new ElementProcessingOrderPartitionFn<T>()));
-
-      // For each input PCollection partition, create a write transform that represents
-      // one of the specific shards.
-      for (int i = 0; i < transform.getNumShards(); ++i) {
-        /*
-         * This logic mirrors the file naming strategy within
-         * {@link FileBasedSink#generateDestinationFilenames()}
-         */
-        String outputFilename = IOChannelUtils.constructName(
-            transform.getFilenamePrefix(),
-            transform.getShardNameTemplate(),
-            getFileExtension(transform.getFilenameSuffix()),
-            i,
-            transform.getNumShards());
-
-        String transformName = String.format("%s(Shard:%s)", transform.getName(), i);
-        partitionedElements.get(i).apply(transformName,
-            transform.withNumShards(1).withShardNameTemplate("").withSuffix("").to(outputFilename));
-      }
-      return PDone.in(input.getPipeline());
-    }
-  }
-
-  private static class AssignWindowsAndSetStrategy<T, W extends BoundedWindow>
-      extends PTransform<PCollection<T>, PCollection<T>> {
-
-    private final Window.Bound<T> wrapped;
-
-    public AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) {
-      this.wrapped = wrapped;
-    }
-
-    @Override
-    public PCollection<T> apply(PCollection<T> input) {
-      WindowingStrategy<?, ?> outputStrategy =
-          wrapped.getOutputStrategyInternal(input.getWindowingStrategy());
-
-      WindowFn<T, BoundedWindow> windowFn =
-          (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
-
-      // If the Window.Bound transform only changed parts other than the WindowFn, then
-      // we skip AssignWindows even though it should be harmless in a perfect world.
-      // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly
-      // crash if another GBK is performed without explicitly setting the WindowFn. So we skip
-      // AssignWindows in this case.
-      if (wrapped.getWindowFn() == null) {
-        return input.apply("Identity", ParDo.of(new IdentityFn<T>()))
-            .setWindowingStrategyInternal(outputStrategy);
-      } else {
-        return input
-            .apply("AssignWindows", new AssignWindows<T, BoundedWindow>(windowFn))
-            .setWindowingStrategyInternal(outputStrategy);
-      }
-    }
-  }
-
-  private static class IdentityFn<T> extends DoFn<T, T> {
-    @Override
-    public void processElement(ProcessContext c) {
-      c.output(c.element());
-    }
-  }
-
-  /**
-   * Apply the override for AvroIO.Write.Bound if the user requested sharding controls
-   * greater than one.
-   */
-  private <T> PDone applyAvroIOWrite(AvroIO.Write.Bound<T> transform, PCollection<T> input) {
-    if (transform.getNumShards() <= 1) {
-      // By default, the DirectPipelineRunner outputs to only 1 shard. Since the user never
-      // requested sharding controls greater than 1, we default to outputting to 1 file.
-      return super.apply(transform.withNumShards(1), input);
-    }
-    return input.apply(new DirectAvroIOWrite<>(transform));
-  }
-
-  /**
-   * The implementation may split the {@link KeyedCombineFn} into ADD, MERGE and EXTRACT phases (
-   * see {@code org.apache.beam.sdk.runners.worker.CombineValuesFn}). In order to emulate
-   * this for the {@link DirectPipelineRunner} and provide an experience closer to the service, go
-   * through heavy serializability checks for the equivalent of the results of the ADD phase, but
-   * after the {@link org.apache.beam.sdk.transforms.GroupByKey} shuffle, and the MERGE
-   * phase. Doing these checks ensure that not only is the accumulator coder serializable, but
-   * the accumulator coder can actually serialize the data in question.
-   */
-  public static class TestCombineDoFn<K, InputT, AccumT, OutputT>
-      extends DoFn<KV<K, Iterable<InputT>>, KV<K, OutputT>> {
-    private final PerKeyCombineFnRunner<? super K, ? super InputT, AccumT, OutputT> fnRunner;
-    private final Coder<AccumT> accumCoder;
-    private final boolean testSerializability;
-    private final Random rand;
-
-    public static <K, InputT, AccumT, OutputT> TestCombineDoFn<K, InputT, AccumT, OutputT> create(
-        Combine.GroupedValues<K, InputT, OutputT> transform,
-        PCollection<KV<K, Iterable<InputT>>> input,
-        boolean testSerializability,
-        Random rand) {
-
-      AppliedCombineFn<? super K, ? super InputT, ?, OutputT> fn = transform.getAppliedFn(
-          input.getPipeline().getCoderRegistry(), input.getCoder(), input.getWindowingStrategy());
-
-      return new TestCombineDoFn(
-          PerKeyCombineFnRunners.create(fn.getFn()),
-          fn.getAccumulatorCoder(),
-          testSerializability,
-          rand);
-    }
-
-    public TestCombineDoFn(
-        PerKeyCombineFnRunner<? super K, ? super InputT, AccumT, OutputT> fnRunner,
-        Coder<AccumT> accumCoder,
-        boolean testSerializability,
-        Random rand) {
-      this.fnRunner = fnRunner;
-      this.accumCoder = accumCoder;
-      this.testSerializability = testSerializability;
-      this.rand = rand;
-
-      // Check that this does not crash, specifically to catch anonymous CustomCoder subclasses.
-      this.accumCoder.getEncodingId();
-    }
-
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      K key = c.element().getKey();
-      Iterable<InputT> values = c.element().getValue();
-      List<AccumT> groupedPostShuffle =
-          ensureSerializableByCoder(ListCoder.of(accumCoder),
-              addInputsRandomly(fnRunner, key, values, rand, c),
-              "After addInputs of KeyedCombineFn " + fnRunner.fn().toString());
-      AccumT merged =
-          ensureSerializableByCoder(accumCoder,
-            fnRunner.mergeAccumulators(key, groupedPostShuffle, c),
-            "After mergeAccumulators of KeyedCombineFn " + fnRunner.fn().toString());
-      // Note: The serializability of KV<K, OutputT> is ensured by the
-      // runner itself, since it's a transform output.
-      c.output(KV.of(key, fnRunner.extractOutput(key, merged, c)));
-    }
-
-    /**
-     * Create a random list of accumulators from the given list of values.
-     *
-     * <p>Visible for testing purposes only.
-     */
-    public static <K, AccumT, InputT> List<AccumT> addInputsRandomly(
-        PerKeyCombineFnRunner<? super K, ? super InputT, AccumT, ?> fnRunner,
-        K key,
-        Iterable<InputT> values,
-        Random random,
-        DoFn<?, ?>.ProcessContext c) {
-      List<AccumT> out = new ArrayList<AccumT>();
-      int i = 0;
-      AccumT accumulator = fnRunner.createAccumulator(key, c);
-      boolean hasInput = false;
-
-      for (InputT value : values) {
-        accumulator = fnRunner.addInput(key, accumulator, value, c);
-        hasInput = true;
-
-        // For each index i, flip a 1/2^i weighted coin for whether to
-        // create a new accumulator after index i is added, i.e. [0]
-        // is guaranteed, [1] is an even 1/2, [2] is 1/4, etc. The
-        // goal is to partition the inputs into accumulators, and make
-        // the accumulators potentially lumpy.  Also compact about half
-        // of the accumulators.
-        if (i == 0 || random.nextInt(1 << Math.min(i, 30)) == 0) {
-          if (i % 2 == 0) {
-            accumulator = fnRunner.compact(key, accumulator, c);
-          }
-          out.add(accumulator);
-          accumulator = fnRunner.createAccumulator(key, c);
-          hasInput = false;
-        }
-        i++;
-      }
-      if (hasInput) {
-        out.add(accumulator);
-      }
-
-      Collections.shuffle(out, random);
-      return out;
-    }
-
-    public <T> T ensureSerializableByCoder(
-        Coder<T> coder, T value, String errorContext) {
-      if (testSerializability) {
-        return SerializableUtils.ensureSerializableByCoder(
-            coder, value, errorContext);
-      }
-      return value;
-    }
-  }
-
-  @Override
-  public EvaluationResults run(Pipeline pipeline) {
-    LOG.info("Executing pipeline using the DirectPipelineRunner.");
-
-    Evaluator evaluator = new Evaluator(rand);
-    evaluator.run(pipeline);
-
-    // Log all counter values for debugging purposes.
-    for (Counter counter : evaluator.getCounters()) {
-      LOG.info("Final aggregator value: {}", counter);
-    }
-
-    LOG.info("Pipeline execution complete.");
-
-    return evaluator;
-  }
-
-  /**
-   * An evaluator of a PTransform.
-   */
-  public interface TransformEvaluator<TransformT extends PTransform> {
-    public void evaluate(TransformT transform,
-                         EvaluationContext context);
-  }
-
-  /**
-   * The interface provided to registered callbacks for interacting
-   * with the {@code DirectPipelineRunner}, including reading and writing the
-   * values of {@link PCollection}s and {@link PCollectionView}s.
-   */
-  public interface EvaluationResults extends PipelineResult {
-    /**
-     * Retrieves the value of the given PCollection.
-     * Throws an exception if the PCollection's value hasn't already been set.
-     */
-    <T> List<T> getPCollection(PCollection<T> pc);
-
-    /**
-     * Retrieves the windowed value of the given PCollection.
-     * Throws an exception if the PCollection's value hasn't already been set.
-     */
-    <T> List<WindowedValue<T>> getPCollectionWindowedValues(PCollection<T> pc);
-
-    /**
-     * Retrieves the values of each PCollection in the given
-     * PCollectionList. Throws an exception if the PCollectionList's
-     * value hasn't already been set.
-     */
-    <T> List<List<T>> getPCollectionList(PCollectionList<T> pcs);
-
-    /**
-     * Retrieves the values indicated by the given {@link PCollectionView}.
-     * Note that within the {@link org.apache.beam.sdk.transforms.DoFn.Context}
-     * implementation a {@link PCollectionView} should convert from this representation to a
-     * suitable side input value.
-     */
-    <T, WindowedT> Iterable<WindowedValue<?>> getPCollectionView(PCollectionView<T> view);
-  }
-
-  /**
-   * An immutable (value, timestamp) pair, along with other metadata necessary
-   * for the implementation of {@code DirectPipelineRunner}.
-   */
-  public static class ValueWithMetadata<V> {
-    /**
-     * Returns a new {@code ValueWithMetadata} with the {@code WindowedValue}.
-     * Key is null.
-     */
-    public static <V> ValueWithMetadata<V> of(WindowedValue<V> windowedValue) {
-      return new ValueWithMetadata<>(windowedValue, null);
-    }
-
-    /**
-     * Returns a new {@code ValueWithMetadata} with the implicit key associated
-     * with this value set.  The key is the last key grouped by in the chain of
-     * productions that produced this element.
-     * These keys are used internally by {@link DirectPipelineRunner} for keeping
-     * persisted state separate across keys.
-     */
-    public ValueWithMetadata<V> withKey(Object key) {
-      return new ValueWithMetadata<>(windowedValue, key);
-    }
-
-    /**
-     * Returns a new {@code ValueWithMetadata} that is a copy of this one, but with
-     * a different value.
-     */
-    public <T> ValueWithMetadata<T> withValue(T value) {
-      return new ValueWithMetadata(windowedValue.withValue(value), getKey());
-    }
-
-    /**
-     * Returns the {@code WindowedValue} associated with this element.
-     */
-    public WindowedValue<V> getWindowedValue() {
-      return windowedValue;
-    }
-
-    /**
-     * Returns the value associated with this element.
-     *
-     * @see #withValue
-     */
-    public V getValue() {
-      return windowedValue.getValue();
-    }
-
-    /**
-     * Returns the timestamp associated with this element.
-     */
-    public Instant getTimestamp() {
-      return windowedValue.getTimestamp();
-    }
-
-    /**
-     * Returns the collection of windows this element has been placed into.  May
-     * be null if the {@code PCollection} this element is in has not yet been
-     * windowed.
-     *
-     * @see #getWindows()
-     */
-    public Collection<? extends BoundedWindow> getWindows() {
-      return windowedValue.getWindows();
-    }
-
-
-    /**
-     * Returns the key associated with this element.  May be null if the
-     * {@code PCollection} this element is in is not keyed.
-     *
-     * @see #withKey
-     */
-    public Object getKey() {
-      return key;
-    }
-
-    ////////////////////////////////////////////////////////////////////////////
-
-  private final Object key;
-    private final WindowedValue<V> windowedValue;
-
-    private ValueWithMetadata(WindowedValue<V> windowedValue,
-                              Object key) {
-      this.windowedValue = windowedValue;
-      this.key = key;
-    }
-  }
-
-  /**
-   * The interface provided to registered callbacks for interacting
-   * with the {@code DirectPipelineRunner}, including reading and writing the
-   * values of {@link PCollection}s and {@link PCollectionView}s.
-   */
-  public interface EvaluationContext extends EvaluationResults {
-    /**
-     * Returns the configured pipeline options.
-     */
-    DirectPipelineOptions getPipelineOptions();
-
-    /**
-     * Returns the input of the currently being processed transform.
-     */
-    <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform);
-
-    /**
-     * Returns the output of the currently being processed transform.
-     */
-    <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform);
-
-    /**
-     * Sets the value of the given PCollection, where each element also has a timestamp
-     * and collection of windows.
-     * Throws an exception if the PCollection's value has already been set.
-     */
-    <T> void setPCollectionValuesWithMetadata(
-        PCollection<T> pc, List<ValueWithMetadata<T>> elements);
-
-    /**
-     * Sets the value of the given PCollection, where each element also has a timestamp
-     * and collection of windows.
-     * Throws an exception if the PCollection's value has already been set.
-     */
-    <T> void setPCollectionWindowedValue(PCollection<T> pc, List<WindowedValue<T>> elements);
-
-    /**
-     * Shorthand for setting the value of a PCollection where the elements do not have
-     * timestamps or windows.
-     * Throws an exception if the PCollection's value has already been set.
-     */
-    <T> void setPCollection(PCollection<T> pc, List<T> elements);
-
-    /**
-     * Retrieves the value of the given PCollection, along with element metadata
-     * such as timestamps and windows.
-     * Throws an exception if the PCollection's value hasn't already been set.
-     */
-    <T> List<ValueWithMetadata<T>> getPCollectionValuesWithMetadata(PCollection<T> pc);
-
-    /**
-     * Sets the value associated with the given {@link PCollectionView}.
-     * Throws an exception if the {@link PCollectionView}'s value has already been set.
-     */
-    <ElemT, T, WindowedT> void setPCollectionView(
-        PCollectionView<T> pc,
-        Iterable<WindowedValue<ElemT>> value);
-
-    /**
-     * Ensures that the element is encodable and decodable using the
-     * TypePValue's coder, by encoding it and decoding it, and
-     * returning the result.
-     */
-    <T> T ensureElementEncodable(TypedPValue<T> pvalue, T element);
-
-    /**
-     * If the evaluation context is testing unorderedness,
-     * randomly permutes the order of the elements, in a
-     * copy if !inPlaceAllowed, and returns the permuted list,
-     * otherwise returns the argument unchanged.
-     */
-    <T> List<T> randomizeIfUnordered(List<T> elements,
-                                     boolean inPlaceAllowed);
-
-    /**
-     * If the evaluation context is testing serializability, ensures
-     * that the argument function is serializable and deserializable
-     * by encoding it and then decoding it, and returning the result.
-     * Otherwise returns the argument unchanged.
-     */
-    <FunctionT extends Serializable> FunctionT ensureSerializable(FunctionT fn);
-
-    /**
-     * If the evaluation context is testing serializability, ensures
-     * that the argument Coder is serializable and deserializable
-     * by encoding it and then decoding it, and returning the result.
-     * Otherwise returns the argument unchanged.
-     */
-    <T> Coder<T> ensureCoderSerializable(Coder<T> coder);
-
-    /**
-     * If the evaluation context is testing serializability, ensures
-     * that the given data is serializable and deserializable with the
-     * given Coder by encoding it and then decoding it, and returning
-     * the result. Otherwise returns the argument unchanged.
-     *
-     * <p>Error context is prefixed to any thrown exceptions.
-     */
-    <T> T ensureSerializableByCoder(Coder<T> coder,
-                                    T data, String errorContext);
-
-    /**
-     * Returns a mutator, which can be used to add additional counters to
-     * this EvaluationContext.
-     */
-    CounterSet.AddCounterMutator getAddCounterMutator();
-
-    /**
-     * Gets the step name for this transform.
-     */
-    public String getStepName(PTransform<?, ?> transform);
-  }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  class Evaluator extends PipelineVisitor.Defaults implements EvaluationContext {
-    /**
-     * A map from PTransform to the step name of that transform. This is the internal name for the
-     * transform (e.g. "s2").
-     */
-    private final Map<PTransform<?, ?>, String> stepNames = new HashMap<>();
-    private final Map<PValue, Object> store = new HashMap<>();
-    private final CounterSet counters = new CounterSet();
-    private AppliedPTransform<?, ?, ?> currentTransform;
-
-    private Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps = null;
-
-    /**
-     * A map from PTransform to the full name of that transform. This is the user name of the
-     * transform (e.g. "RemoveDuplicates/Combine/GroupByKey").
-     */
-    private final Map<PTransform<?, ?>, String> fullNames = new HashMap<>();
-
-    private Random rand;
-
-    public Evaluator() {
-      this(new Random());
-    }
-
-    public Evaluator(Random rand) {
-      this.rand = rand;
-    }
-
-    public void run(Pipeline pipeline) {
-      pipeline.traverseTopologically(this);
-      aggregatorSteps = new AggregatorPipelineExtractor(pipeline).getAggregatorSteps();
-    }
-
-    @Override
-    public DirectPipelineOptions getPipelineOptions() {
-      return options;
-    }
-
-    @Override
-    public <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform) {
-      checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
-          "can only be called with current transform");
-      return (InputT) currentTransform.getInput();
-    }
-
-    @Override
-    public <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform) {
-      checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
-          "can only be called with current transform");
-      return (OutputT) currentTransform.getOutput();
-    }
-
-    @Override
-    public void visitPrimitiveTransform(TransformTreeNode node) {
-      PTransform<?, ?> transform = node.getTransform();
-      fullNames.put(transform, node.getFullName());
-      TransformEvaluator evaluator =
-          getTransformEvaluator(transform.getClass());
-      if (evaluator == null) {
-        throw new IllegalStateException(
-            "no evaluator registered for " + transform);
-      }
-      LOG.debug("Evaluating {}", transform);
-      currentTransform = AppliedPTransform.of(
-          node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform);
-      evaluator.evaluate(transform, this);
-      currentTransform = null;
-    }
-
-    @Override
-    public void visitValue(PValue value, TransformTreeNode producer) {
-      LOG.debug("Checking evaluation of {}", value);
-      if (value.getProducingTransformInternal() == null) {
-        throw new RuntimeException(
-            "internal error: expecting a PValue to have a producingTransform");
-      }
-      if (!producer.isCompositeNode()) {
-        // Verify that primitive transform outputs are already computed.
-        getPValue(value);
-      }
-    }
-
-    /**
-     * Sets the value of the given PValue.
-     * Throws an exception if the PValue's value has already been set.
-     */
-    void setPValue(PValue pvalue, Object contents) {
-      if (store.containsKey(pvalue)) {
-        throw new IllegalStateException(
-            "internal error: setting the value of " + pvalue
-            + " more than once");
-      }
-      store.put(pvalue, contents);
-    }
-
-    /**
-     * Retrieves the value of the given PValue.
-     * Throws an exception if the PValue's value hasn't already been set.
-     */
-    Object getPValue(PValue pvalue) {
-      if (!store.containsKey(pvalue)) {
-        throw new IllegalStateException(
-            "internal error: getting the value of " + pvalue
-            + " before it has been computed");
-      }
-      return store.get(pvalue);
-    }
-
-    /**
-     * Convert a list of T to a list of {@code ValueWithMetadata<T>}, with a timestamp of 0
-     * and null windows.
-     */
-    <T> List<ValueWithMetadata<T>> toValueWithMetadata(List<T> values) {
-      List<ValueWithMetadata<T>> result = new ArrayList<>(values.size());
-      for (T value : values) {
-        result.add(ValueWithMetadata.of(WindowedValue.valueInGlobalWindow(value)));
-      }
-      return result;
-    }
-
-    /**
-     * Convert a list of {@code WindowedValue<T>} to a list of {@code ValueWithMetadata<T>}.
-     */
-    <T> List<ValueWithMetadata<T>> toValueWithMetadataFromWindowedValue(
-        List<WindowedValue<T>> values) {
-      List<ValueWithMetadata<T>> result = new ArrayList<>(values.size());
-      for (WindowedValue<T> value : values) {
-        result.add(ValueWithMetadata.of(value));
-      }
-      return result;
-    }
-
-    @Override
-    public <T> void setPCollection(PCollection<T> pc, List<T> elements) {
-      setPCollectionValuesWithMetadata(pc, toValueWithMetadata(elements));
-    }
-
-    @Override
-    public <T> void setPCollectionWindowedValue(
-        PCollection<T> pc, List<WindowedValue<T>> elements) {
-      setPCollectionValuesWithMetadata(pc, toValueWithMetadataFromWindowedValue(elements));
-    }
-
-    @Override
-    public <T> void setPCollectionValuesWithMetadata(
-        PCollection<T> pc, List<ValueWithMetadata<T>> elements) {
-      LOG.debug("Setting {} = {}", pc, elements);
-      ensurePCollectionEncodable(pc, elements);
-      setPValue(pc, elements);
-    }
-
-    @Override
-    public <ElemT, T, WindowedT> void setPCollectionView(
-        PCollectionView<T> view,
-        Iterable<WindowedValue<ElemT>> value) {
-      LOG.debug("Setting {} = {}", view, value);
-      setPValue(view, value);
-    }
-
-    /**
-     * Retrieves the value of the given {@link PCollection}.
-     * Throws an exception if the {@link PCollection}'s value hasn't already been set.
-     */
-    @Override
-    public <T> List<T> getPCollection(PCollection<T> pc) {
-      List<T> result = new ArrayList<>();
-      for (ValueWithMetadata<T> elem : getPCollectionValuesWithMetadata(pc)) {
-        result.add(elem.getValue());
-      }
-      return result;
-    }
-
-    @Override
-    public <T> List<WindowedValue<T>> getPCollectionWindowedValues(PCollection<T> pc) {
-      return Lists.transform(
-          getPCollectionValuesWithMetadata(pc),
-          new Function<ValueWithMetadata<T>, WindowedValue<T>>() {
-            @Override
-            public WindowedValue<T> apply(ValueWithMetadata<T> input) {
-              return input.getWindowedValue();
-            }});
-    }
-
-    @Override
-    public <T> List<ValueWithMetadata<T>> getPCollectionValuesWithMetadata(PCollection<T> pc) {
-      List<ValueWithMetadata<T>> elements = (List<ValueWithMetadata<T>>) getPValue(pc);
-      elements = randomizeIfUnordered(elements, false /* not inPlaceAllowed */);
-      LOG.debug("Getting {} = {}", pc, elements);
-      return elements;
-    }
-
-    @Override
-    public <T> List<List<T>> getPCollectionList(PCollectionList<T> pcs) {
-      List<List<T>> elementsList = new ArrayList<>();
-      for (PCollection<T> pc : pcs.getAll()) {
-        elementsList.add(getPCollection(pc));
-      }
-      return elementsList;
-    }
-
-    /**
-     * Retrieves the value indicated by the given {@link PCollectionView}.
-     * Note that within the {@link DoFnContext} a {@link PCollectionView}
-     * converts from this representation to a suitable side input value.
-     */
-    @Override
-    public <T, WindowedT> Iterable<WindowedValue<?>> getPCollectionView(PCollectionView<T> view) {
-      Iterable<WindowedValue<?>> value = (Iterable<WindowedValue<?>>) getPValue(view);
-      LOG.debug("Getting {} = {}", view, value);
-      return value;
-    }
-
-    /**
-     * If {@code testEncodability}, ensures that the {@link PCollection}'s coder and elements are
-     * encodable and decodable by encoding them and decoding them, and returning the result.
-     * Otherwise returns the argument elements.
-     */
-    <T> List<ValueWithMetadata<T>> ensurePCollectionEncodable(
-        PCollection<T> pc, List<ValueWithMetadata<T>> elements) {
-      ensureCoderSerializable(pc.getCoder());
-      if (!testEncodability) {
-        return elements;
-      }
-      List<ValueWithMetadata<T>> elementsCopy = new ArrayList<>(elements.size());
-      for (ValueWithMetadata<T> element : elements) {
-        elementsCopy.add(
-            element.withValue(ensureElementEncodable(pc, element.getValue())));
-      }
-      return elementsCopy;
-    }
-
-    @Override
-    public <T> T ensureElementEncodable(TypedPValue<T> pvalue, T element) {
-      return ensureSerializableByCoder(
-          pvalue.getCoder(), element, "Within " + pvalue.toString());
-    }
-
-    @Override
-    public <T> List<T> randomizeIfUnordered(List<T> elements,
-                                            boolean inPlaceAllowed) {
-      if (!testUnorderedness) {
-        return elements;
-      }
-      List<T> elementsCopy = new ArrayList<>(elements);
-      Collections.shuffle(elementsCopy, rand);
-      return elementsCopy;
-    }
-
-    @Override
-    public <FunctionT extends Serializable> FunctionT ensureSerializable(FunctionT fn) {
-      if (!testSerializability) {
-        return fn;
-      }
-      return SerializableUtils.ensureSerializable(fn);
-    }
-
-    @Override
-    public <T> Coder<T> ensureCoderSerializable(Coder<T> coder) {
-      if (testSerializability) {
-        SerializableUtils.ensureSerializable(coder);
-      }
-      return coder;
-    }
-
-    @Override
-    public <T> T ensureSerializableByCoder(
-        Coder<T> coder, T value, String errorContext) {
-      if (testSerializability) {
-        return SerializableUtils.ensureSerializableByCoder(
-            coder, value, errorContext);
-      }
-      return value;
-    }
-
-    @Override
-    public CounterSet.AddCounterMutator getAddCounterMutator() {
-      return counters.getAddCounterMutator();
-    }
-
-    @Override
-    public String getStepName(PTransform<?, ?> transform) {
-      String stepName = stepNames.get(transform);
-      if (stepName == null) {
-        stepName = "s" + (stepNames.size() + 1);
-        stepNames.put(transform, stepName);
-      }
-      return stepName;
-    }
-
-    /**
-     * Returns the CounterSet generated during evaluation, which includes
-     * user-defined Aggregators and may include system-defined counters.
-     */
-    public CounterSet getCounters() {
-      return counters;
-    }
-
-    /**
-     * Returns JobState.DONE in all situations. The Evaluator is not returned
-     * until the pipeline has been traversed, so it will either be returned
-     * after a successful run or the run call will terminate abnormally.
-     */
-    @Override
-    public State getState() {
-      return State.DONE;
-    }
-
-    @Override
-    public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) {
-      Map<String, T> stepValues = new HashMap<>();
-      for (PTransform<?, ?> step : aggregatorSteps.get(aggregator)) {
-        String stepName = String.format("user-%s-%s", stepNames.get(step), aggregator.getName());
-        String fullName = fullNames.get(step);
-        Counter<?> counter = counters.getExistingCounter(stepName);
-        if (counter == null) {
-          throw new IllegalArgumentException(
-              "Aggregator " + aggregator + " is not used in this pipeline");
-        }
-        stepValues.put(fullName, (T) counter.getAggregate());
-      }
-      return new MapAggregatorValues<>(stepValues);
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * The key by which GBK groups inputs - elements are grouped by the encoded form of the key,
-   * but the original key may be accessed as well.
-   */
-  private static class GroupingKey<K> {
-    private K key;
-    private byte[] encodedKey;
-
-    public GroupingKey(K key, byte[] encodedKey) {
-      this.key = key;
-      this.encodedKey = encodedKey;
-    }
-
-    public K getKey() {
-      return key;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (o instanceof GroupingKey) {
-        GroupingKey<?> that = (GroupingKey<?>) o;
-        return Arrays.equals(this.encodedKey, that.encodedKey);
-      } else {
-        return false;
-      }
-    }
-
-    @Override
-    public int hashCode() {
-      return Arrays.hashCode(encodedKey);
-    }
-  }
-
-  private final DirectPipelineOptions options;
-  private boolean testSerializability;
-  private boolean testEncodability;
-  private boolean testUnorderedness;
-
-  /** Returns a new DirectPipelineRunner. */
-  private DirectPipelineRunner(DirectPipelineOptions options) {
-    this.options = options;
-    // (Re-)register standard IO factories. Clobbers any prior credentials.
-    IOChannelUtils.registerStandardIOFactories(options);
-    long randomSeed;
-    if (options.getDirectPipelineRunnerRandomSeed() != null) {
-      randomSeed = options.getDirectPipelineRunnerRandomSeed();
-    } else {
-      randomSeed = new Random().nextLong();
-    }
-
-    LOG.debug("DirectPipelineRunner using random seed {}.", randomSeed);
-    rand = new Random(randomSeed);
-
-    testSerializability = options.isTestSerializability();
-    testEncodability = options.isTestEncodability();
-    testUnorderedness = options.isTestUnorderedness();
-  }
-
-  /**
-   * Get the options used in this {@link Pipeline}.
-   */
-  public DirectPipelineOptions getPipelineOptions() {
-    return options;
-  }
-
-  @Override
-  public String toString() {
-    return "DirectPipelineRunner#" + hashCode();
-  }
-
-  public static <K, V> void evaluateGroupByKeyOnly(
-      GroupByKeyOnly<K, V> transform,
-      EvaluationContext context) {
-    PCollection<KV<K, V>> input = context.getInput(transform);
-
-    List<ValueWithMetadata<KV<K, V>>> inputElems =
-        context.getPCollectionValuesWithMetadata(input);
-
-    Coder<K> keyCoder = GroupByKey.getKeyCoder(input.getCoder());
-
-    Map<GroupingKey<K>, List<V>> groupingMap = new HashMap<>();
-
-    for (ValueWithMetadata<KV<K, V>> elem : inputElems) {
-      K key = elem.getValue().getKey();
-      V value = elem.getValue().getValue();
-      byte[] encodedKey;
-      try {
-        encodedKey = encodeToByteArray(keyCoder, key);
-      } catch (CoderException exn) {
-        // TODO: Put in better element printing:
-        // truncate if too long.
-        throw new IllegalArgumentException(
-            "unable to encode key " + key + " of input to " + transform
-            + " using " + keyCoder,
-            exn);
-      }
-      GroupingKey<K> groupingKey =
-          new GroupingKey<>(key, encodedKey);
-      List<V> values = groupingMap.get(groupingKey);
-      if (values == null) {
-        values = new ArrayList<V>();
-        groupingMap.put(groupingKey, values);
-      }
-      values.add(value);
-    }
-
-    List<ValueWithMetadata<KV<K, Iterable<V>>>> outputElems =
-        new ArrayList<>();
-    for (Map.Entry<GroupingKey<K>, List<V>> entry : groupingMap.entrySet()) {
-      GroupingKey<K> groupingKey = entry.getKey();
-      K key = groupingKey.getKey();
-      List<V> values = entry.getValue();
-      values = context.randomizeIfUnordered(values, true /* inPlaceAllowed */);
-      outputElems.add(ValueWithMetadata
-                      .of(WindowedValue.valueInEmptyWindows(KV.<K, Iterable<V>>of(key, values)))
-                      .withKey(key));
-    }
-
-    context.setPCollectionValuesWithMetadata(context.getOutput(transform),
-                                             outputElems);
-  }
-
-  @SuppressWarnings({"rawtypes", "unchecked"})
-  public
-  static <K, V> void registerGroupByKeyOnly() {
-    registerDefaultTransformEvaluator(
-        GroupByKeyOnly.class,
-        new TransformEvaluator<GroupByKeyOnly>() {
-          @Override
-          public void evaluate(
-              GroupByKeyOnly transform,
-              EvaluationContext context) {
-            evaluateGroupByKeyOnly(transform, context);
-          }
-        });
-  }
-
-  static {
-    registerGroupByKeyOnly();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
index 7c6fed3..93917f3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
@@ -20,16 +20,12 @@ package org.apache.beam.sdk.transforms;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableLikeCoder;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionList;
 
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * {@code Flatten<T>} takes multiple {@code PCollection<T>}s bundled
  * into a {@code PCollectionList<T>} and returns a single
@@ -189,32 +185,4 @@ public class Flatten {
           .setCoder(elemCoder);
     }
   }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  static {
-    DirectPipelineRunner.registerDefaultTransformEvaluator(
-        FlattenPCollectionList.class,
-        new DirectPipelineRunner.TransformEvaluator<FlattenPCollectionList>() {
-          @Override
-          public void evaluate(
-              FlattenPCollectionList transform,
-              DirectPipelineRunner.EvaluationContext context) {
-            evaluateHelper(transform, context);
-          }
-        });
-  }
-
-  private static <T> void evaluateHelper(
-      FlattenPCollectionList<T> transform,
-      DirectPipelineRunner.EvaluationContext context) {
-    List<DirectPipelineRunner.ValueWithMetadata<T>> outputElems = new ArrayList<>();
-    PCollectionList<T> inputs = context.getInput(transform);
-
-    for (PCollection<T> input : inputs.getAll()) {
-      outputElems.addAll(context.getPCollectionValuesWithMetadata(input));
-    }
-
-    context.setPCollectionValuesWithMetadata(context.getOutput(transform), outputElems);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 511f0d8..cb7d372 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -21,27 +21,12 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.DirectModeExecutionContext;
-import org.apache.beam.sdk.util.DirectSideInputReader;
-import org.apache.beam.sdk.util.DoFnRunner;
-import org.apache.beam.sdk.util.DoFnRunnerBase;
-import org.apache.beam.sdk.util.DoFnRunners;
-import org.apache.beam.sdk.util.IllegalMutationException;
-import org.apache.beam.sdk.util.MutationDetector;
-import org.apache.beam.sdk.util.MutationDetectors;
-import org.apache.beam.sdk.util.PTuple;
 import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.StringUtils;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -50,16 +35,10 @@ import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.TypedPValue;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
 
 import java.io.Serializable;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.annotation.Nullable;
 
 /**
  * {@link ParDo} is the core element-wise transform in Google Cloud
@@ -84,7 +63,7 @@ import javax.annotation.Nullable;
  * <p>Conceptually, when a {@link ParDo} transform is executed, the
  * elements of the input {@link PCollection} are first divided up
  * into some number of "bundles". These are farmed off to distributed
- * worker machines (or run locally, if using the {@link DirectPipelineRunner}).
+ * worker machines (or run locally, if using the {@code DirectRunner}).
  * For each bundle of input elements processing proceeds as follows:
  *
  * <ol>
@@ -1072,288 +1051,11 @@ public class ParDo {
     }
   }
 
-  /////////////////////////////////////////////////////////////////////////////
-
-  static {
-    DirectPipelineRunner.registerDefaultTransformEvaluator(
-        Bound.class,
-        new DirectPipelineRunner.TransformEvaluator<Bound>() {
-          @Override
-          public void evaluate(
-              Bound transform,
-              DirectPipelineRunner.EvaluationContext context) {
-            evaluateSingleHelper(transform, context);
-          }
-        });
-  }
-
-  private static <InputT, OutputT> void evaluateSingleHelper(
-      Bound<InputT, OutputT> transform,
-      DirectPipelineRunner.EvaluationContext context) {
-    TupleTag<OutputT> mainOutputTag = new TupleTag<>("out");
-
-    DirectModeExecutionContext executionContext = DirectModeExecutionContext.create();
-
-    PCollectionTuple outputs = PCollectionTuple.of(mainOutputTag, context.getOutput(transform));
-
-    evaluateHelper(
-        transform.fn,
-        context.getStepName(transform),
-        context.getInput(transform),
-        transform.sideInputs,
-        mainOutputTag,
-        Collections.<TupleTag<?>>emptyList(),
-        outputs,
-        context,
-        executionContext);
-
-    context.setPCollectionValuesWithMetadata(
-        context.getOutput(transform),
-        executionContext.getOutput(mainOutputTag));
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  static {
-    DirectPipelineRunner.registerDefaultTransformEvaluator(
-        BoundMulti.class,
-        new DirectPipelineRunner.TransformEvaluator<BoundMulti>() {
-          @Override
-          public void evaluate(
-              BoundMulti transform,
-              DirectPipelineRunner.EvaluationContext context) {
-            evaluateMultiHelper(transform, context);
-          }
-        });
-  }
-
-  private static <InputT, OutputT> void evaluateMultiHelper(
-      BoundMulti<InputT, OutputT> transform,
-      DirectPipelineRunner.EvaluationContext context) {
-
-    DirectModeExecutionContext executionContext = DirectModeExecutionContext.create();
-
-    evaluateHelper(
-        transform.fn,
-        context.getStepName(transform),
-        context.getInput(transform),
-        transform.sideInputs,
-        transform.mainOutputTag,
-        transform.sideOutputTags.getAll(),
-        context.getOutput(transform),
-        context,
-        executionContext);
-
-    for (Map.Entry<TupleTag<?>, PCollection<?>> entry
-        : context.getOutput(transform).getAll().entrySet()) {
-      @SuppressWarnings("unchecked")
-      TupleTag<Object> tag = (TupleTag<Object>) entry.getKey();
-      @SuppressWarnings("unchecked")
-      PCollection<Object> pc = (PCollection<Object>) entry.getValue();
-
-      context.setPCollectionValuesWithMetadata(
-          pc,
-          (tag == transform.mainOutputTag
-              ? executionContext.getOutput(tag)
-              : executionContext.getSideOutput(tag)));
-    }
-  }
-
-  /**
-   * Evaluates a single-output or multi-output {@link ParDo} directly.
-   *
-   * <p>This evaluation method is intended for use in testing scenarios; it is designed for clarity
-   * and correctness-checking, not speed.
-   *
-   * <p>Of particular note, this performs best-effort checking that inputs and outputs are not
-   * mutated in violation of the requirements upon a {@link DoFn}.
-   */
-  private static <InputT, OutputT, ActualInputT extends InputT> void evaluateHelper(
-      DoFn<InputT, OutputT> doFn,
-      String stepName,
-      PCollection<ActualInputT> input,
-      List<PCollectionView<?>> sideInputs,
-      TupleTag<OutputT> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
-      PCollectionTuple outputs,
-      DirectPipelineRunner.EvaluationContext context,
-      DirectModeExecutionContext executionContext) {
-    // TODO: Run multiple shards?
-    DoFn<InputT, OutputT> fn = context.ensureSerializable(doFn);
-
-    SideInputReader sideInputReader = makeSideInputReader(context, sideInputs);
-
-    // When evaluating via the DirectPipelineRunner, this output manager checks each output for
-    // illegal mutations when the next output comes along. We then verify again after finishBundle()
-    // The common case we expect this to catch is a user mutating an input in order to repeatedly
-    // emit "variations".
-    ImmutabilityCheckingOutputManager<ActualInputT> outputManager =
-        new ImmutabilityCheckingOutputManager<>(
-            fn.getClass().getSimpleName(),
-            new DoFnRunnerBase.ListOutputManager(),
-            outputs);
-
-    DoFnRunner<InputT, OutputT> fnRunner =
-        DoFnRunners.createDefault(
-            context.getPipelineOptions(),
-            fn,
-            sideInputReader,
-            outputManager,
-            mainOutputTag,
-            sideOutputTags,
-            executionContext.getOrCreateStepContext(stepName, stepName),
-            context.getAddCounterMutator(),
-            input.getWindowingStrategy());
-
-    fnRunner.startBundle();
-
-    for (DirectPipelineRunner.ValueWithMetadata<ActualInputT> elem
-             : context.getPCollectionValuesWithMetadata(input)) {
-      if (elem.getValue() instanceof KV) {
-        // In case the DoFn needs keyed state, set the implicit keys to the keys
-        // in the input elements.
-        @SuppressWarnings("unchecked")
-        KV<?, ?> kvElem = (KV<?, ?>) elem.getValue();
-        executionContext.setKey(kvElem.getKey());
-      } else {
-        executionContext.setKey(elem.getKey());
-      }
-
-      // We check the input for mutations only through the call span of processElement.
-      // This will miss some cases, but the check is ad hoc and best effort. The common case
-      // is that the input is mutated to be used for output.
-      try {
-        MutationDetector inputMutationDetector = MutationDetectors.forValueWithCoder(
-            elem.getWindowedValue().getValue(), input.getCoder());
-        @SuppressWarnings("unchecked")
-        WindowedValue<InputT> windowedElem = ((WindowedValue<InputT>) elem.getWindowedValue());
-        fnRunner.processElement(windowedElem);
-        inputMutationDetector.verifyUnmodified();
-      } catch (CoderException e) {
-        throw UserCodeException.wrap(e);
-      } catch (IllegalMutationException exn) {
-        throw new IllegalMutationException(
-            String.format("DoFn %s mutated input value %s of class %s (new value was %s)."
-                + " Input values must not be mutated in any way.",
-                fn.getClass().getSimpleName(),
-                exn.getSavedValue(), exn.getSavedValue().getClass(), exn.getNewValue()),
-            exn.getSavedValue(),
-            exn.getNewValue(),
-            exn);
-      }
-    }
-
-    // Note that the input could have been retained and mutated prior to this final output,
-    // but for now it degrades readability too much to be worth trying to catch that particular
-    // corner case.
-    fnRunner.finishBundle();
-    outputManager.verifyLatestOutputsUnmodified();
-  }
-
-  private static SideInputReader makeSideInputReader(
-      DirectPipelineRunner.EvaluationContext context, List<PCollectionView<?>> sideInputs) {
-    PTuple sideInputValues = PTuple.empty();
-    for (PCollectionView<?> view : sideInputs) {
-      sideInputValues = sideInputValues.and(
-          view.getTagInternal(),
-          context.getPCollectionView(view));
-    }
-    return DirectSideInputReader.of(sideInputValues);
-  }
-
   private static void populateDisplayData(
       DisplayData.Builder builder, DoFn<?, ?> fn, Class<?> fnClass) {
     builder
         .include(fn)
         .add(DisplayData.item("fn", fnClass)
-          .withLabel("Transform Function"));
-  }
-
-  /**
-   * A {@code DoFnRunner.OutputManager} that provides facilities for checking output values for
-   * illegal mutations.
-   *
-   * <p>When used via the try-with-resources pattern, it is guaranteed that every value passed
-   * to {@link #output} will have been checked for illegal mutation.
-   */
-  private static class ImmutabilityCheckingOutputManager<InputT>
-      implements DoFnRunners.OutputManager, AutoCloseable {
-
-    private final DoFnRunners.OutputManager underlyingOutputManager;
-    private final ConcurrentMap<TupleTag<?>, MutationDetector> mutationDetectorForTag;
-    private final PCollectionTuple outputs;
-    private String doFnName;
-
-    public ImmutabilityCheckingOutputManager(
-        String doFnName,
-        DoFnRunners.OutputManager underlyingOutputManager,
-        PCollectionTuple outputs) {
-      this.doFnName = doFnName;
-      this.underlyingOutputManager = underlyingOutputManager;
-      this.outputs = outputs;
-      this.mutationDetectorForTag = Maps.newConcurrentMap();
-    }
-
-    @Override
-    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-
-      // Skip verifying undeclared outputs, since we don't have coders for them.
-      if (outputs.has(tag)) {
-        try {
-          MutationDetector newDetector =
-              MutationDetectors.forValueWithCoder(
-                  output.getValue(), outputs.get(tag).getCoder());
-          MutationDetector priorDetector = mutationDetectorForTag.put(tag, newDetector);
-          verifyOutputUnmodified(priorDetector);
-        } catch (CoderException e) {
-          throw UserCodeException.wrap(e);
-        }
-      }
-
-      // Actually perform the output.
-      underlyingOutputManager.output(tag, output);
-    }
-
-    /**
-     * Throws {@link IllegalMutationException} if the prior output for any tag has been mutated
-     * since being output.
-     */
-    public void verifyLatestOutputsUnmodified() {
-      for (MutationDetector detector : mutationDetectorForTag.values()) {
-        verifyOutputUnmodified(detector);
-      }
-    }
-
-    /**
-     * Adapts the error message from the provided {@code detector}.
-     *
-     * <p>The {@code detector} may be null, in which case no check is performed. This is merely
-     * to consolidate null checking to this method.
-     */
-    private <T> void verifyOutputUnmodified(@Nullable MutationDetector detector) {
-      if (detector == null) {
-        return;
-      }
-
-      try {
-        detector.verifyUnmodified();
-      } catch (IllegalMutationException exn) {
-        throw new IllegalMutationException(String.format(
-            "DoFn %s mutated value %s after it was output (new value was %s)."
-                + " Values must not be mutated in any way after being output.",
-                doFnName, exn.getSavedValue(), exn.getNewValue()),
-            exn.getSavedValue(), exn.getNewValue(),
-            exn);
-      }
-    }
-
-    /**
-     * When used in a {@code try}-with-resources block, verifies all of the latest outputs upon
-     * {@link #close()}.
-     */
-    @Override
-    public void close() {
-      verifyLatestOutputsUnmodified();
-    }
+            .withLabel("Transform Function"));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 3df915b..7a97c13 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -17,10 +17,8 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.util.PCollectionViews;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -445,27 +443,5 @@ public class View {
     public PCollectionView<ViewT> apply(PCollection<ElemT> input) {
       return view;
     }
-
-    static {
-      DirectPipelineRunner.registerDefaultTransformEvaluator(
-          CreatePCollectionView.class,
-          new DirectPipelineRunner.TransformEvaluator<CreatePCollectionView>() {
-            @SuppressWarnings("rawtypes")
-            @Override
-            public void evaluate(
-                CreatePCollectionView transform,
-                DirectPipelineRunner.EvaluationContext context) {
-              evaluateTyped(transform, context);
-            }
-
-            private <ElemT, ViewT> void evaluateTyped(
-                CreatePCollectionView<ElemT, ViewT> transform,
-                DirectPipelineRunner.EvaluationContext context) {
-              List<WindowedValue<ElemT>> elems =
-                  context.getPCollectionWindowedValues(context.getInput(transform));
-              context.setPCollectionView(context.getOutput(transform), elems);
-            }
-          });
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java
deleted file mode 100644
index 85e36dd..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.sdk.runners.DirectPipelineRunner.ValueWithMetadata;
-import org.apache.beam.sdk.util.state.InMemoryStateInternals;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.values.TupleTag;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * {@link ExecutionContext} for use in direct mode.
- */
-public class DirectModeExecutionContext
-    extends BaseExecutionContext<DirectModeExecutionContext.StepContext> {
-
-  private Object key;
-  private List<ValueWithMetadata<?>> output = Lists.newArrayList();
-  private Map<TupleTag<?>, List<ValueWithMetadata<?>>> sideOutputs = Maps.newHashMap();
-
-  protected DirectModeExecutionContext() {}
-
-  public static DirectModeExecutionContext create() {
-    return new DirectModeExecutionContext();
-  }
-
-  @Override
-  protected StepContext createStepContext(String stepName, String transformName) {
-    return new StepContext(this, stepName, transformName);
-  }
-
-  public Object getKey() {
-    return key;
-  }
-
-  public void setKey(Object newKey) {
-    // The direct mode runner may reorder elements, so we need to keep
-    // around the state used for each key.
-    for (ExecutionContext.StepContext stepContext : getAllStepContexts()) {
-      ((StepContext) stepContext).switchKey(newKey);
-    }
-    key = newKey;
-  }
-
-  @Override
-  public void noteOutput(WindowedValue<?> outputElem) {
-    output.add(ValueWithMetadata.of(outputElem).withKey(getKey()));
-  }
-
-  @Override
-  public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> outputElem) {
-    List<ValueWithMetadata<?>> output = sideOutputs.get(tag);
-    if (output == null) {
-      output = Lists.newArrayList();
-      sideOutputs.put(tag, output);
-    }
-    output.add(ValueWithMetadata.of(outputElem).withKey(getKey()));
-  }
-
-  public <T> List<ValueWithMetadata<T>> getOutput(@SuppressWarnings("unused") TupleTag<T> tag) {
-    @SuppressWarnings({"unchecked", "rawtypes"}) // Cast not expressible without rawtypes
-    List<ValueWithMetadata<T>> typedOutput = (List) output;
-    return typedOutput;
-  }
-
-  public <T> List<ValueWithMetadata<T>> getSideOutput(TupleTag<T> tag) {
-    if (sideOutputs.containsKey(tag)) {
-      @SuppressWarnings({"unchecked", "rawtypes"}) // Cast not expressible without rawtypes
-      List<ValueWithMetadata<T>> typedOutput = (List) sideOutputs.get(tag);
-      return typedOutput;
-    } else {
-      return Lists.newArrayList();
-    }
-  }
-
-  /**
-   * {@link ExecutionContext.StepContext} used in direct mode.
-   */
-  public static class StepContext extends BaseExecutionContext.StepContext {
-
-    /** A map from each key to the state associated with it. */
-    private final Map<Object, InMemoryStateInternals<Object>> stateInternals = Maps.newHashMap();
-    private InMemoryStateInternals<Object> currentStateInternals = null;
-
-    private StepContext(ExecutionContext executionContext, String stepName, String transformName) {
-      super(executionContext, stepName, transformName);
-      switchKey(null);
-    }
-
-    public void switchKey(Object newKey) {
-      currentStateInternals = stateInternals.get(newKey);
-      if (currentStateInternals == null) {
-        currentStateInternals = InMemoryStateInternals.forKey(newKey);
-        stateInternals.put(newKey, currentStateInternals);
-      }
-    }
-
-    @Override
-    public StateInternals<Object> stateInternals() {
-      return checkNotNull(currentStateInternals);
-    }
-
-    @Override
-    public TimerInternals timerInternals() {
-      throw new UnsupportedOperationException("Direct mode cannot return timerInternals");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf476e12/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
index 75861fe..58b10a7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.util;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;