You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/14 06:47:55 UTC

[08/74] [partial] incubator-beam git commit: Rename com/google/cloud/dataflow->org/apache/beam

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
deleted file mode 100644
index 57e6116..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
+++ /dev/null
@@ -1,1320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import static com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.ListCoder;
-import com.google.cloud.dataflow.sdk.io.AvroIO;
-import com.google.cloud.dataflow.sdk.io.FileBasedSink;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.DirectPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions.CheckEnabled;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.Partition;
-import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.util.AppliedCombineFn;
-import com.google.cloud.dataflow.sdk.util.AssignWindows;
-import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly;
-import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
-import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
-import com.google.cloud.dataflow.sdk.util.MapAggregatorValues;
-import com.google.cloud.dataflow.sdk.util.PerKeyCombineFnRunner;
-import com.google.cloud.dataflow.sdk.util.PerKeyCombineFnRunners;
-import com.google.cloud.dataflow.sdk.util.SerializableUtils;
-import com.google.cloud.dataflow.sdk.util.TestCredential;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.util.common.Counter;
-import com.google.cloud.dataflow.sdk.util.common.CounterSet;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PDone;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.cloud.dataflow.sdk.values.TypedPValue;
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-/**
- * Executes the operations in the pipeline directly, in this process, without
- * any optimization.  Useful for small local execution and tests.
- *
- * <p>Throws an exception from {@link #run} if execution fails.
- *
- * <p><h3>Permissions</h3>
- * When reading from a Dataflow source or writing to a Dataflow sink using
- * {@code DirectPipelineRunner}, the Cloud Platform account that you configured with the
- * <a href="https://cloud.google.com/sdk/gcloud">gcloud</a> executable will need access to the
- * corresponding source/sink.
- *
- * <p>Please see <a href="https://cloud.google.com/dataflow/security-and-permissions">Google Cloud
- * Dataflow Security and Permissions</a> for more details.
- */
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class DirectPipelineRunner
-    extends PipelineRunner<DirectPipelineRunner.EvaluationResults> {
-  private static final Logger LOG = LoggerFactory.getLogger(DirectPipelineRunner.class);
-
-  /**
-   * A source of random data, which can be seeded if determinism is desired.
-   */
-  private Random rand;
-
-  /**
-   * A map from PTransform class to the corresponding
-   * TransformEvaluator to use to evaluate that transform.
-   *
-   * <p>A static map that contains system-wide defaults.
-   */
-  private static Map<Class, TransformEvaluator> defaultTransformEvaluators =
-      new HashMap<>();
-
-  /**
-   * A map from PTransform class to the corresponding
-   * TransformEvaluator to use to evaluate that transform.
-   *
-   * <p>An instance map that contains bindings for this DirectPipelineRunner.
-   * Bindings in this map override those in the default map.
-   */
-  private Map<Class, TransformEvaluator> localTransformEvaluators =
-      new HashMap<>();
-
-  /**
-   * Records that instances of the specified PTransform class
-   * should be evaluated by default by the corresponding
-   * TransformEvaluator.
-   */
-  public static <TransformT extends PTransform<?, ?>>
-  void registerDefaultTransformEvaluator(
-      Class<TransformT> transformClass,
-      TransformEvaluator<? super TransformT> transformEvaluator) {
-    if (defaultTransformEvaluators.put(transformClass, transformEvaluator)
-        != null) {
-      throw new IllegalArgumentException(
-          "defining multiple evaluators for " + transformClass);
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Records that instances of the specified PTransform class
-   * should be evaluated by the corresponding TransformEvaluator.
-   * Overrides any bindings specified by
-   * {@link #registerDefaultTransformEvaluator}.
-   */
-  public <TransformT extends PTransform<?, ?>>
-  void registerTransformEvaluator(
-      Class<TransformT> transformClass,
-      TransformEvaluator<TransformT> transformEvaluator) {
-    if (localTransformEvaluators.put(transformClass, transformEvaluator)
-        != null) {
-      throw new IllegalArgumentException(
-          "defining multiple evaluators for " + transformClass);
-    }
-  }
-
-  /**
-   * Returns the TransformEvaluator to use for instances of the
-   * specified PTransform class, or null if none registered.
-   */
-  public <TransformT extends PTransform<?, ?>>
-      TransformEvaluator<TransformT> getTransformEvaluator(Class<TransformT> transformClass) {
-    TransformEvaluator<TransformT> transformEvaluator =
-        localTransformEvaluators.get(transformClass);
-    if (transformEvaluator == null) {
-      transformEvaluator = defaultTransformEvaluators.get(transformClass);
-    }
-    return transformEvaluator;
-  }
-
-  /**
-   * Constructs a DirectPipelineRunner from the given options.
-   */
-  public static DirectPipelineRunner fromOptions(PipelineOptions options) {
-    DirectPipelineOptions directOptions =
-        PipelineOptionsValidator.validate(DirectPipelineOptions.class, options);
-    LOG.debug("Creating DirectPipelineRunner");
-    return new DirectPipelineRunner(directOptions);
-  }
-
-  /**
-   * Constructs a runner with default properties for testing.
-   *
-   * @return The newly created runner.
-   */
-  public static DirectPipelineRunner createForTest() {
-    DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class);
-    options.setStableUniqueNames(CheckEnabled.ERROR);
-    options.setGcpCredential(new TestCredential());
-    return new DirectPipelineRunner(options);
-  }
-
-  /**
-   * Enable runtime testing to verify that all functions and {@link Coder}
-   * instances can be serialized.
-   *
-   * <p>Enabled by default.
-   *
-   * <p>This method modifies the {@code DirectPipelineRunner} instance and
-   * returns itself.
-   */
-  public DirectPipelineRunner withSerializabilityTesting(boolean enable) {
-    this.testSerializability = enable;
-    return this;
-  }
-
-  /**
-   * Enable runtime testing to verify that all values can be encoded.
-   *
-   * <p>Enabled by default.
-   *
-   * <p>This method modifies the {@code DirectPipelineRunner} instance and
-   * returns itself.
-   */
-  public DirectPipelineRunner withEncodabilityTesting(boolean enable) {
-    this.testEncodability = enable;
-    return this;
-  }
-
-  /**
-   * Enable runtime testing to verify that functions do not depend on order
-   * of the elements.
-   *
-   * <p>This is accomplished by randomizing the order of elements.
-   *
-   * <p>Enabled by default.
-   *
-   * <p>This method modifies the {@code DirectPipelineRunner} instance and
-   * returns itself.
-   */
-  public DirectPipelineRunner withUnorderednessTesting(boolean enable) {
-    this.testUnorderedness = enable;
-    return this;
-  }
-
-  @Override
-  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
-      PTransform<InputT, OutputT> transform, InputT input) {
-    if (transform instanceof Combine.GroupedValues) {
-      return (OutputT) applyTestCombine((Combine.GroupedValues) transform, (PCollection) input);
-    } else if (transform instanceof TextIO.Write.Bound) {
-      return (OutputT) applyTextIOWrite((TextIO.Write.Bound) transform, (PCollection<?>) input);
-    } else if (transform instanceof AvroIO.Write.Bound) {
-      return (OutputT) applyAvroIOWrite((AvroIO.Write.Bound) transform, (PCollection<?>) input);
-    } else if (transform instanceof GroupByKey) {
-      return (OutputT)
-          ((PCollection) input).apply(new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
-    } else if (transform instanceof Window.Bound) {
-      return (OutputT)
-          ((PCollection) input).apply(new AssignWindowsAndSetStrategy((Window.Bound) transform));
-    } else {
-      return super.apply(transform, input);
-    }
-  }
-
-  private <K, InputT, AccumT, OutputT> PCollection<KV<K, OutputT>> applyTestCombine(
-      Combine.GroupedValues<K, InputT, OutputT> transform,
-      PCollection<KV<K, Iterable<InputT>>> input) {
-
-    PCollection<KV<K, OutputT>> output = input
-        .apply(ParDo.of(TestCombineDoFn.create(transform, input, testSerializability, rand))
-            .withSideInputs(transform.getSideInputs()));
-
-    try {
-      output.setCoder(transform.getDefaultOutputCoder(input));
-    } catch (CannotProvideCoderException exc) {
-      // let coder inference occur later, if it can
-    }
-    return output;
-  }
-
-  private static class ElementProcessingOrderPartitionFn<T> implements PartitionFn<T> {
-    private int elementNumber;
-    @Override
-    public int partitionFor(T elem, int numPartitions) {
-      return elementNumber++ % numPartitions;
-    }
-  }
-
-  /**
-   * Applies TextIO.Write honoring user requested sharding controls (i.e. withNumShards)
-   * by applying a partition function based upon the number of shards the user requested.
-   */
-  private static class DirectTextIOWrite<T> extends PTransform<PCollection<T>, PDone> {
-    private final TextIO.Write.Bound<T> transform;
-
-    private DirectTextIOWrite(TextIO.Write.Bound<T> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PDone apply(PCollection<T> input) {
-      checkState(transform.getNumShards() > 1,
-          "DirectTextIOWrite is expected to only be used when sharding controls are required.");
-
-      // Evenly distribute all the elements across the partitions.
-      PCollectionList<T> partitionedElements =
-          input.apply(Partition.of(transform.getNumShards(),
-                                   new ElementProcessingOrderPartitionFn<T>()));
-
-      // For each input PCollection partition, create a write transform that represents
-      // one of the specific shards.
-      for (int i = 0; i < transform.getNumShards(); ++i) {
-        /*
-         * This logic mirrors the file naming strategy within
-         * {@link FileBasedSink#generateDestinationFilenames()}
-         */
-        String outputFilename = IOChannelUtils.constructName(
-            transform.getFilenamePrefix(),
-            transform.getShardNameTemplate(),
-            getFileExtension(transform.getFilenameSuffix()),
-            i,
-            transform.getNumShards());
-
-        String transformName = String.format("%s(Shard:%s)", transform.getName(), i);
-        partitionedElements.get(i).apply(transformName,
-            transform.withNumShards(1).withShardNameTemplate("").withSuffix("").to(outputFilename));
-      }
-      return PDone.in(input.getPipeline());
-    }
-  }
-
-  /**
-   * Returns the file extension to be used. If the user did not request a file
-   * extension then this method returns the empty string. Otherwise this method
-   * adds a {@code "."} to the beginning of the users extension if one is not present.
-   *
-   * <p>This is copied from {@link FileBasedSink} to not expose it.
-   */
-  private static String getFileExtension(String usersExtension) {
-    if (usersExtension == null || usersExtension.isEmpty()) {
-      return "";
-    }
-    if (usersExtension.startsWith(".")) {
-      return usersExtension;
-    }
-    return "." + usersExtension;
-  }
-
-  /**
-   * Apply the override for TextIO.Write.Bound if the user requested sharding controls
-   * greater than one.
-   */
-  private <T> PDone applyTextIOWrite(TextIO.Write.Bound<T> transform, PCollection<T> input) {
-    if (transform.getNumShards() <= 1) {
-      // By default, the DirectPipelineRunner outputs to only 1 shard. Since the user never
-      // requested sharding controls greater than 1, we default to outputting to 1 file.
-      return super.apply(transform.withNumShards(1), input);
-    }
-    return input.apply(new DirectTextIOWrite<>(transform));
-  }
-
-  /**
-   * Applies AvroIO.Write honoring user requested sharding controls (i.e. withNumShards)
-   * by applying a partition function based upon the number of shards the user requested.
-   */
-  private static class DirectAvroIOWrite<T> extends PTransform<PCollection<T>, PDone> {
-    private final AvroIO.Write.Bound<T> transform;
-
-    private DirectAvroIOWrite(AvroIO.Write.Bound<T> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PDone apply(PCollection<T> input) {
-      checkState(transform.getNumShards() > 1,
-          "DirectAvroIOWrite is expected to only be used when sharding controls are required.");
-
-      // Evenly distribute all the elements across the partitions.
-      PCollectionList<T> partitionedElements =
-          input.apply(Partition.of(transform.getNumShards(),
-                                   new ElementProcessingOrderPartitionFn<T>()));
-
-      // For each input PCollection partition, create a write transform that represents
-      // one of the specific shards.
-      for (int i = 0; i < transform.getNumShards(); ++i) {
-        /*
-         * This logic mirrors the file naming strategy within
-         * {@link FileBasedSink#generateDestinationFilenames()}
-         */
-        String outputFilename = IOChannelUtils.constructName(
-            transform.getFilenamePrefix(),
-            transform.getShardNameTemplate(),
-            getFileExtension(transform.getFilenameSuffix()),
-            i,
-            transform.getNumShards());
-
-        String transformName = String.format("%s(Shard:%s)", transform.getName(), i);
-        partitionedElements.get(i).apply(transformName,
-            transform.withNumShards(1).withShardNameTemplate("").withSuffix("").to(outputFilename));
-      }
-      return PDone.in(input.getPipeline());
-    }
-  }
-
-  private static class AssignWindowsAndSetStrategy<T, W extends BoundedWindow>
-      extends PTransform<PCollection<T>, PCollection<T>> {
-
-    private final Window.Bound<T> wrapped;
-
-    public AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) {
-      this.wrapped = wrapped;
-    }
-
-    @Override
-    public PCollection<T> apply(PCollection<T> input) {
-      WindowingStrategy<?, ?> outputStrategy =
-          wrapped.getOutputStrategyInternal(input.getWindowingStrategy());
-
-      WindowFn<T, BoundedWindow> windowFn =
-          (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
-
-      // If the Window.Bound transform only changed parts other than the WindowFn, then
-      // we skip AssignWindows even though it should be harmless in a perfect world.
-      // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly
-      // crash if another GBK is performed without explicitly setting the WindowFn. So we skip
-      // AssignWindows in this case.
-      if (wrapped.getWindowFn() == null) {
-        return input.apply("Identity", ParDo.of(new IdentityFn<T>()))
-            .setWindowingStrategyInternal(outputStrategy);
-      } else {
-        return input
-            .apply("AssignWindows", new AssignWindows<T, BoundedWindow>(windowFn))
-            .setWindowingStrategyInternal(outputStrategy);
-      }
-    }
-  }
-
-  private static class IdentityFn<T> extends DoFn<T, T> {
-    @Override
-    public void processElement(ProcessContext c) {
-      c.output(c.element());
-    }
-  }
-
-  /**
-   * Apply the override for AvroIO.Write.Bound if the user requested sharding controls
-   * greater than one.
-   */
-  private <T> PDone applyAvroIOWrite(AvroIO.Write.Bound<T> transform, PCollection<T> input) {
-    if (transform.getNumShards() <= 1) {
-      // By default, the DirectPipelineRunner outputs to only 1 shard. Since the user never
-      // requested sharding controls greater than 1, we default to outputting to 1 file.
-      return super.apply(transform.withNumShards(1), input);
-    }
-    return input.apply(new DirectAvroIOWrite<>(transform));
-  }
-
-  /**
-   * The implementation may split the {@link KeyedCombineFn} into ADD, MERGE and EXTRACT phases (
-   * see {@code com.google.cloud.dataflow.sdk.runners.worker.CombineValuesFn}). In order to emulate
-   * this for the {@link DirectPipelineRunner} and provide an experience closer to the service, go
-   * through heavy serializability checks for the equivalent of the results of the ADD phase, but
-   * after the {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey} shuffle, and the MERGE
-   * phase. Doing these checks ensure that not only is the accumulator coder serializable, but
-   * the accumulator coder can actually serialize the data in question.
-   */
-  public static class TestCombineDoFn<K, InputT, AccumT, OutputT>
-      extends DoFn<KV<K, Iterable<InputT>>, KV<K, OutputT>> {
-    private final PerKeyCombineFnRunner<? super K, ? super InputT, AccumT, OutputT> fnRunner;
-    private final Coder<AccumT> accumCoder;
-    private final boolean testSerializability;
-    private final Random rand;
-
-    public static <K, InputT, AccumT, OutputT> TestCombineDoFn<K, InputT, AccumT, OutputT> create(
-        Combine.GroupedValues<K, InputT, OutputT> transform,
-        PCollection<KV<K, Iterable<InputT>>> input,
-        boolean testSerializability,
-        Random rand) {
-
-      AppliedCombineFn<? super K, ? super InputT, ?, OutputT> fn = transform.getAppliedFn(
-          input.getPipeline().getCoderRegistry(), input.getCoder(), input.getWindowingStrategy());
-
-      return new TestCombineDoFn(
-          PerKeyCombineFnRunners.create(fn.getFn()),
-          fn.getAccumulatorCoder(),
-          testSerializability,
-          rand);
-    }
-
-    public TestCombineDoFn(
-        PerKeyCombineFnRunner<? super K, ? super InputT, AccumT, OutputT> fnRunner,
-        Coder<AccumT> accumCoder,
-        boolean testSerializability,
-        Random rand) {
-      this.fnRunner = fnRunner;
-      this.accumCoder = accumCoder;
-      this.testSerializability = testSerializability;
-      this.rand = rand;
-
-      // Check that this does not crash, specifically to catch anonymous CustomCoder subclasses.
-      this.accumCoder.getEncodingId();
-    }
-
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      K key = c.element().getKey();
-      Iterable<InputT> values = c.element().getValue();
-      List<AccumT> groupedPostShuffle =
-          ensureSerializableByCoder(ListCoder.of(accumCoder),
-              addInputsRandomly(fnRunner, key, values, rand, c),
-              "After addInputs of KeyedCombineFn " + fnRunner.fn().toString());
-      AccumT merged =
-          ensureSerializableByCoder(accumCoder,
-            fnRunner.mergeAccumulators(key, groupedPostShuffle, c),
-            "After mergeAccumulators of KeyedCombineFn " + fnRunner.fn().toString());
-      // Note: The serializability of KV<K, OutputT> is ensured by the
-      // runner itself, since it's a transform output.
-      c.output(KV.of(key, fnRunner.extractOutput(key, merged, c)));
-    }
-
-    /**
-     * Create a random list of accumulators from the given list of values.
-     *
-     * <p>Visible for testing purposes only.
-     */
-    public static <K, AccumT, InputT> List<AccumT> addInputsRandomly(
-        PerKeyCombineFnRunner<? super K, ? super InputT, AccumT, ?> fnRunner,
-        K key,
-        Iterable<InputT> values,
-        Random random,
-        DoFn<?, ?>.ProcessContext c) {
-      List<AccumT> out = new ArrayList<AccumT>();
-      int i = 0;
-      AccumT accumulator = fnRunner.createAccumulator(key, c);
-      boolean hasInput = false;
-
-      for (InputT value : values) {
-        accumulator = fnRunner.addInput(key, accumulator, value, c);
-        hasInput = true;
-
-        // For each index i, flip a 1/2^i weighted coin for whether to
-        // create a new accumulator after index i is added, i.e. [0]
-        // is guaranteed, [1] is an even 1/2, [2] is 1/4, etc. The
-        // goal is to partition the inputs into accumulators, and make
-        // the accumulators potentially lumpy.  Also compact about half
-        // of the accumulators.
-        if (i == 0 || random.nextInt(1 << Math.min(i, 30)) == 0) {
-          if (i % 2 == 0) {
-            accumulator = fnRunner.compact(key, accumulator, c);
-          }
-          out.add(accumulator);
-          accumulator = fnRunner.createAccumulator(key, c);
-          hasInput = false;
-        }
-        i++;
-      }
-      if (hasInput) {
-        out.add(accumulator);
-      }
-
-      Collections.shuffle(out, random);
-      return out;
-    }
-
-    public <T> T ensureSerializableByCoder(
-        Coder<T> coder, T value, String errorContext) {
-      if (testSerializability) {
-        return SerializableUtils.ensureSerializableByCoder(
-            coder, value, errorContext);
-      }
-      return value;
-    }
-  }
-
-  @Override
-  public EvaluationResults run(Pipeline pipeline) {
-    LOG.info("Executing pipeline using the DirectPipelineRunner.");
-
-    Evaluator evaluator = new Evaluator(rand);
-    evaluator.run(pipeline);
-
-    // Log all counter values for debugging purposes.
-    for (Counter counter : evaluator.getCounters()) {
-      LOG.info("Final aggregator value: {}", counter);
-    }
-
-    LOG.info("Pipeline execution complete.");
-
-    return evaluator;
-  }
-
-  /**
-   * An evaluator of a PTransform.
-   */
-  public interface TransformEvaluator<TransformT extends PTransform> {
-    public void evaluate(TransformT transform,
-                         EvaluationContext context);
-  }
-
-  /**
-   * The interface provided to registered callbacks for interacting
-   * with the {@code DirectPipelineRunner}, including reading and writing the
-   * values of {@link PCollection}s and {@link PCollectionView}s.
-   */
-  public interface EvaluationResults extends PipelineResult {
-    /**
-     * Retrieves the value of the given PCollection.
-     * Throws an exception if the PCollection's value hasn't already been set.
-     */
-    <T> List<T> getPCollection(PCollection<T> pc);
-
-    /**
-     * Retrieves the windowed value of the given PCollection.
-     * Throws an exception if the PCollection's value hasn't already been set.
-     */
-    <T> List<WindowedValue<T>> getPCollectionWindowedValues(PCollection<T> pc);
-
-    /**
-     * Retrieves the values of each PCollection in the given
-     * PCollectionList. Throws an exception if the PCollectionList's
-     * value hasn't already been set.
-     */
-    <T> List<List<T>> getPCollectionList(PCollectionList<T> pcs);
-
-    /**
-     * Retrieves the values indicated by the given {@link PCollectionView}.
-     * Note that within the {@link com.google.cloud.dataflow.sdk.transforms.DoFn.Context}
-     * implementation a {@link PCollectionView} should convert from this representation to a
-     * suitable side input value.
-     */
-    <T, WindowedT> Iterable<WindowedValue<?>> getPCollectionView(PCollectionView<T> view);
-  }
-
-  /**
-   * An immutable (value, timestamp) pair, along with other metadata necessary
-   * for the implementation of {@code DirectPipelineRunner}.
-   */
-  public static class ValueWithMetadata<V> {
-    /**
-     * Returns a new {@code ValueWithMetadata} with the {@code WindowedValue}.
-     * Key is null.
-     */
-    public static <V> ValueWithMetadata<V> of(WindowedValue<V> windowedValue) {
-      return new ValueWithMetadata<>(windowedValue, null);
-    }
-
-    /**
-     * Returns a new {@code ValueWithMetadata} with the implicit key associated
-     * with this value set.  The key is the last key grouped by in the chain of
-     * productions that produced this element.
-     * These keys are used internally by {@link DirectPipelineRunner} for keeping
-     * persisted state separate across keys.
-     */
-    public ValueWithMetadata<V> withKey(Object key) {
-      return new ValueWithMetadata<>(windowedValue, key);
-    }
-
-    /**
-     * Returns a new {@code ValueWithMetadata} that is a copy of this one, but with
-     * a different value.
-     */
-    public <T> ValueWithMetadata<T> withValue(T value) {
-      return new ValueWithMetadata(windowedValue.withValue(value), getKey());
-    }
-
-    /**
-     * Returns the {@code WindowedValue} associated with this element.
-     */
-    public WindowedValue<V> getWindowedValue() {
-      return windowedValue;
-    }
-
-    /**
-     * Returns the value associated with this element.
-     *
-     * @see #withValue
-     */
-    public V getValue() {
-      return windowedValue.getValue();
-    }
-
-    /**
-     * Returns the timestamp associated with this element.
-     */
-    public Instant getTimestamp() {
-      return windowedValue.getTimestamp();
-    }
-
-    /**
-     * Returns the collection of windows this element has been placed into.  May
-     * be null if the {@code PCollection} this element is in has not yet been
-     * windowed.
-     *
-     * @see #getWindows()
-     */
-    public Collection<? extends BoundedWindow> getWindows() {
-      return windowedValue.getWindows();
-    }
-
-
-    /**
-     * Returns the key associated with this element.  May be null if the
-     * {@code PCollection} this element is in is not keyed.
-     *
-     * @see #withKey
-     */
-    public Object getKey() {
-      return key;
-    }
-
-    ////////////////////////////////////////////////////////////////////////////
-
-  private final Object key;
-    private final WindowedValue<V> windowedValue;
-
-    private ValueWithMetadata(WindowedValue<V> windowedValue,
-                              Object key) {
-      this.windowedValue = windowedValue;
-      this.key = key;
-    }
-  }
-
-  /**
-   * The interface provided to registered callbacks for interacting
-   * with the {@code DirectPipelineRunner}, including reading and writing the
-   * values of {@link PCollection}s and {@link PCollectionView}s.
-   */
-  public interface EvaluationContext extends EvaluationResults {
-    /**
-     * Returns the configured pipeline options.
-     */
-    DirectPipelineOptions getPipelineOptions();
-
-    /**
-     * Returns the input of the currently being processed transform.
-     */
-    <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform);
-
-    /**
-     * Returns the output of the currently being processed transform.
-     */
-    <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform);
-
-    /**
-     * Sets the value of the given PCollection, where each element also has a timestamp
-     * and collection of windows.
-     * Throws an exception if the PCollection's value has already been set.
-     */
-    <T> void setPCollectionValuesWithMetadata(
-        PCollection<T> pc, List<ValueWithMetadata<T>> elements);
-
-    /**
-     * Sets the value of the given PCollection, where each element also has a timestamp
-     * and collection of windows.
-     * Throws an exception if the PCollection's value has already been set.
-     */
-    <T> void setPCollectionWindowedValue(PCollection<T> pc, List<WindowedValue<T>> elements);
-
-    /**
-     * Shorthand for setting the value of a PCollection where the elements do not have
-     * timestamps or windows.
-     * Throws an exception if the PCollection's value has already been set.
-     */
-    <T> void setPCollection(PCollection<T> pc, List<T> elements);
-
-    /**
-     * Retrieves the value of the given PCollection, along with element metadata
-     * such as timestamps and windows.
-     * Throws an exception if the PCollection's value hasn't already been set.
-     */
-    <T> List<ValueWithMetadata<T>> getPCollectionValuesWithMetadata(PCollection<T> pc);
-
-    /**
-     * Sets the value associated with the given {@link PCollectionView}.
-     * Throws an exception if the {@link PCollectionView}'s value has already been set.
-     */
-    <ElemT, T, WindowedT> void setPCollectionView(
-        PCollectionView<T> pc,
-        Iterable<WindowedValue<ElemT>> value);
-
-    /**
-     * Ensures that the element is encodable and decodable using the
-     * TypePValue's coder, by encoding it and decoding it, and
-     * returning the result.
-     */
-    <T> T ensureElementEncodable(TypedPValue<T> pvalue, T element);
-
-    /**
-     * If the evaluation context is testing unorderedness,
-     * randomly permutes the order of the elements, in a
-     * copy if !inPlaceAllowed, and returns the permuted list,
-     * otherwise returns the argument unchanged.
-     */
-    <T> List<T> randomizeIfUnordered(List<T> elements,
-                                     boolean inPlaceAllowed);
-
-    /**
-     * If the evaluation context is testing serializability, ensures
-     * that the argument function is serializable and deserializable
-     * by encoding it and then decoding it, and returning the result.
-     * Otherwise returns the argument unchanged.
-     */
-    <FunctionT extends Serializable> FunctionT ensureSerializable(FunctionT fn);
-
-    /**
-     * If the evaluation context is testing serializability, ensures
-     * that the argument Coder is serializable and deserializable
-     * by encoding it and then decoding it, and returning the result.
-     * Otherwise returns the argument unchanged.
-     */
-    <T> Coder<T> ensureCoderSerializable(Coder<T> coder);
-
-    /**
-     * If the evaluation context is testing serializability, ensures
-     * that the given data is serializable and deserializable with the
-     * given Coder by encoding it and then decoding it, and returning
-     * the result. Otherwise returns the argument unchanged.
-     *
-     * <p>Error context is prefixed to any thrown exceptions.
-     */
-    <T> T ensureSerializableByCoder(Coder<T> coder,
-                                    T data, String errorContext);
-
-    /**
-     * Returns a mutator, which can be used to add additional counters to
-     * this EvaluationContext.
-     */
-    CounterSet.AddCounterMutator getAddCounterMutator();
-
-    /**
-     * Gets the step name for this transform.
-     */
-    public String getStepName(PTransform<?, ?> transform);
-  }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  class Evaluator implements PipelineVisitor, EvaluationContext {
-    /**
-     * A map from PTransform to the step name of that transform. This is the internal name for the
-     * transform (e.g. "s2").
-     */
-    private final Map<PTransform<?, ?>, String> stepNames = new HashMap<>();
-    private final Map<PValue, Object> store = new HashMap<>();
-    private final CounterSet counters = new CounterSet();
-    private AppliedPTransform<?, ?, ?> currentTransform;
-
-    private Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps = null;
-
-    /**
-     * A map from PTransform to the full name of that transform. This is the user name of the
-     * transform (e.g. "RemoveDuplicates/Combine/GroupByKey").
-     */
-    private final Map<PTransform<?, ?>, String> fullNames = new HashMap<>();
-
-    private Random rand;
-
-    public Evaluator() {
-      this(new Random());
-    }
-
-    public Evaluator(Random rand) {
-      this.rand = rand;
-    }
-
-    public void run(Pipeline pipeline) {
-      pipeline.traverseTopologically(this);
-      aggregatorSteps = new AggregatorPipelineExtractor(pipeline).getAggregatorSteps();
-    }
-
-    @Override
-    public DirectPipelineOptions getPipelineOptions() {
-      return options;
-    }
-
-    @Override
-    public <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform) {
-      checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
-          "can only be called with current transform");
-      return (InputT) currentTransform.getInput();
-    }
-
-    @Override
-    public <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform) {
-      checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
-          "can only be called with current transform");
-      return (OutputT) currentTransform.getOutput();
-    }
-
-    @Override
-    public void enterCompositeTransform(TransformTreeNode node) {
-    }
-
-    @Override
-    public void leaveCompositeTransform(TransformTreeNode node) {
-    }
-
-    @Override
-    public void visitTransform(TransformTreeNode node) {
-      PTransform<?, ?> transform = node.getTransform();
-      fullNames.put(transform, node.getFullName());
-      TransformEvaluator evaluator =
-          getTransformEvaluator(transform.getClass());
-      if (evaluator == null) {
-        throw new IllegalStateException(
-            "no evaluator registered for " + transform);
-      }
-      LOG.debug("Evaluating {}", transform);
-      currentTransform = AppliedPTransform.of(
-          node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform);
-      evaluator.evaluate(transform, this);
-      currentTransform = null;
-    }
-
-    @Override
-    public void visitValue(PValue value, TransformTreeNode producer) {
-      LOG.debug("Checking evaluation of {}", value);
-      if (value.getProducingTransformInternal() == null) {
-        throw new RuntimeException(
-            "internal error: expecting a PValue " +
-            "to have a producingTransform");
-      }
-      if (!producer.isCompositeNode()) {
-        // Verify that primitive transform outputs are already computed.
-        getPValue(value);
-      }
-    }
-
-    /**
-     * Sets the value of the given PValue.
-     * Throws an exception if the PValue's value has already been set.
-     */
-    void setPValue(PValue pvalue, Object contents) {
-      if (store.containsKey(pvalue)) {
-        throw new IllegalStateException(
-            "internal error: setting the value of " + pvalue +
-            " more than once");
-      }
-      store.put(pvalue, contents);
-    }
-
-    /**
-     * Retrieves the value of the given PValue.
-     * Throws an exception if the PValue's value hasn't already been set.
-     */
-    Object getPValue(PValue pvalue) {
-      if (!store.containsKey(pvalue)) {
-        throw new IllegalStateException(
-            "internal error: getting the value of " + pvalue +
-            " before it has been computed");
-      }
-      return store.get(pvalue);
-    }
-
-    /**
-     * Convert a list of T to a list of {@code ValueWithMetadata<T>}, with a timestamp of 0
-     * and null windows.
-     */
-    <T> List<ValueWithMetadata<T>> toValueWithMetadata(List<T> values) {
-      List<ValueWithMetadata<T>> result = new ArrayList<>(values.size());
-      for (T value : values) {
-        result.add(ValueWithMetadata.of(WindowedValue.valueInGlobalWindow(value)));
-      }
-      return result;
-    }
-
-    /**
-     * Convert a list of {@code WindowedValue<T>} to a list of {@code ValueWithMetadata<T>}.
-     */
-    <T> List<ValueWithMetadata<T>> toValueWithMetadataFromWindowedValue(
-        List<WindowedValue<T>> values) {
-      List<ValueWithMetadata<T>> result = new ArrayList<>(values.size());
-      for (WindowedValue<T> value : values) {
-        result.add(ValueWithMetadata.of(value));
-      }
-      return result;
-    }
-
-    @Override
-    public <T> void setPCollection(PCollection<T> pc, List<T> elements) {
-      setPCollectionValuesWithMetadata(pc, toValueWithMetadata(elements));
-    }
-
-    @Override
-    public <T> void setPCollectionWindowedValue(
-        PCollection<T> pc, List<WindowedValue<T>> elements) {
-      setPCollectionValuesWithMetadata(pc, toValueWithMetadataFromWindowedValue(elements));
-    }
-
-    @Override
-    public <T> void setPCollectionValuesWithMetadata(
-        PCollection<T> pc, List<ValueWithMetadata<T>> elements) {
-      LOG.debug("Setting {} = {}", pc, elements);
-      ensurePCollectionEncodable(pc, elements);
-      setPValue(pc, elements);
-    }
-
-    @Override
-    public <ElemT, T, WindowedT> void setPCollectionView(
-        PCollectionView<T> view,
-        Iterable<WindowedValue<ElemT>> value) {
-      LOG.debug("Setting {} = {}", view, value);
-      setPValue(view, value);
-    }
-
-    /**
-     * Retrieves the value of the given {@link PCollection}.
-     * Throws an exception if the {@link PCollection}'s value hasn't already been set.
-     */
-    @Override
-    public <T> List<T> getPCollection(PCollection<T> pc) {
-      List<T> result = new ArrayList<>();
-      for (ValueWithMetadata<T> elem : getPCollectionValuesWithMetadata(pc)) {
-        result.add(elem.getValue());
-      }
-      return result;
-    }
-
-    @Override
-    public <T> List<WindowedValue<T>> getPCollectionWindowedValues(PCollection<T> pc) {
-      return Lists.transform(
-          getPCollectionValuesWithMetadata(pc),
-          new Function<ValueWithMetadata<T>, WindowedValue<T>>() {
-            @Override
-            public WindowedValue<T> apply(ValueWithMetadata<T> input) {
-              return input.getWindowedValue();
-            }});
-    }
-
-    @Override
-    public <T> List<ValueWithMetadata<T>> getPCollectionValuesWithMetadata(PCollection<T> pc) {
-      List<ValueWithMetadata<T>> elements = (List<ValueWithMetadata<T>>) getPValue(pc);
-      elements = randomizeIfUnordered(elements, false /* not inPlaceAllowed */);
-      LOG.debug("Getting {} = {}", pc, elements);
-      return elements;
-    }
-
-    @Override
-    public <T> List<List<T>> getPCollectionList(PCollectionList<T> pcs) {
-      List<List<T>> elementsList = new ArrayList<>();
-      for (PCollection<T> pc : pcs.getAll()) {
-        elementsList.add(getPCollection(pc));
-      }
-      return elementsList;
-    }
-
-    /**
-     * Retrieves the value indicated by the given {@link PCollectionView}.
-     * Note that within the {@link DoFnContext} a {@link PCollectionView}
-     * converts from this representation to a suitable side input value.
-     */
-    @Override
-    public <T, WindowedT> Iterable<WindowedValue<?>> getPCollectionView(PCollectionView<T> view) {
-      Iterable<WindowedValue<?>> value = (Iterable<WindowedValue<?>>) getPValue(view);
-      LOG.debug("Getting {} = {}", view, value);
-      return value;
-    }
-
-    /**
-     * If {@code testEncodability}, ensures that the {@link PCollection}'s coder and elements are
-     * encodable and decodable by encoding them and decoding them, and returning the result.
-     * Otherwise returns the argument elements.
-     */
-    <T> List<ValueWithMetadata<T>> ensurePCollectionEncodable(
-        PCollection<T> pc, List<ValueWithMetadata<T>> elements) {
-      ensureCoderSerializable(pc.getCoder());
-      if (!testEncodability) {
-        return elements;
-      }
-      List<ValueWithMetadata<T>> elementsCopy = new ArrayList<>(elements.size());
-      for (ValueWithMetadata<T> element : elements) {
-        elementsCopy.add(
-            element.withValue(ensureElementEncodable(pc, element.getValue())));
-      }
-      return elementsCopy;
-    }
-
-    @Override
-    public <T> T ensureElementEncodable(TypedPValue<T> pvalue, T element) {
-      return ensureSerializableByCoder(
-          pvalue.getCoder(), element, "Within " + pvalue.toString());
-    }
-
-    @Override
-    public <T> List<T> randomizeIfUnordered(List<T> elements,
-                                            boolean inPlaceAllowed) {
-      if (!testUnorderedness) {
-        return elements;
-      }
-      List<T> elementsCopy = new ArrayList<>(elements);
-      Collections.shuffle(elementsCopy, rand);
-      return elementsCopy;
-    }
-
-    @Override
-    public <FunctionT extends Serializable> FunctionT ensureSerializable(FunctionT fn) {
-      if (!testSerializability) {
-        return fn;
-      }
-      return SerializableUtils.ensureSerializable(fn);
-    }
-
-    @Override
-    public <T> Coder<T> ensureCoderSerializable(Coder<T> coder) {
-      if (testSerializability) {
-        SerializableUtils.ensureSerializable(coder);
-      }
-      return coder;
-    }
-
-    @Override
-    public <T> T ensureSerializableByCoder(
-        Coder<T> coder, T value, String errorContext) {
-      if (testSerializability) {
-        return SerializableUtils.ensureSerializableByCoder(
-            coder, value, errorContext);
-      }
-      return value;
-    }
-
-    @Override
-    public CounterSet.AddCounterMutator getAddCounterMutator() {
-      return counters.getAddCounterMutator();
-    }
-
-    @Override
-    public String getStepName(PTransform<?, ?> transform) {
-      String stepName = stepNames.get(transform);
-      if (stepName == null) {
-        stepName = "s" + (stepNames.size() + 1);
-        stepNames.put(transform, stepName);
-      }
-      return stepName;
-    }
-
-    /**
-     * Returns the CounterSet generated during evaluation, which includes
-     * user-defined Aggregators and may include system-defined counters.
-     */
-    public CounterSet getCounters() {
-      return counters;
-    }
-
-    /**
-     * Returns JobState.DONE in all situations. The Evaluator is not returned
-     * until the pipeline has been traversed, so it will either be returned
-     * after a successful run or the run call will terminate abnormally.
-     */
-    @Override
-    public State getState() {
-      return State.DONE;
-    }
-
-    @Override
-    public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) {
-      Map<String, T> stepValues = new HashMap<>();
-      for (PTransform<?, ?> step : aggregatorSteps.get(aggregator)) {
-        String stepName = String.format("user-%s-%s", stepNames.get(step), aggregator.getName());
-        String fullName = fullNames.get(step);
-        Counter<?> counter = counters.getExistingCounter(stepName);
-        if (counter == null) {
-          throw new IllegalArgumentException(
-              "Aggregator " + aggregator + " is not used in this pipeline");
-        }
-        stepValues.put(fullName, (T) counter.getAggregate());
-      }
-      return new MapAggregatorValues<>(stepValues);
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * The key by which GBK groups inputs - elements are grouped by the encoded form of the key,
-   * but the original key may be accessed as well.
-   */
-  private static class GroupingKey<K> {
-    private K key;
-    private byte[] encodedKey;
-
-    public GroupingKey(K key, byte[] encodedKey) {
-      this.key = key;
-      this.encodedKey = encodedKey;
-    }
-
-    public K getKey() {
-      return key;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (o instanceof GroupingKey) {
-        GroupingKey<?> that = (GroupingKey<?>) o;
-        return Arrays.equals(this.encodedKey, that.encodedKey);
-      } else {
-        return false;
-      }
-    }
-
-    @Override
-    public int hashCode() {
-      return Arrays.hashCode(encodedKey);
-    }
-  }
-
-  private final DirectPipelineOptions options;
-  private boolean testSerializability;
-  private boolean testEncodability;
-  private boolean testUnorderedness;
-
-  /** Returns a new DirectPipelineRunner. */
-  private DirectPipelineRunner(DirectPipelineOptions options) {
-    this.options = options;
-    // (Re-)register standard IO factories. Clobbers any prior credentials.
-    IOChannelUtils.registerStandardIOFactories(options);
-    long randomSeed;
-    if (options.getDirectPipelineRunnerRandomSeed() != null) {
-      randomSeed = options.getDirectPipelineRunnerRandomSeed();
-    } else {
-      randomSeed = new Random().nextLong();
-    }
-
-    LOG.debug("DirectPipelineRunner using random seed {}.", randomSeed);
-    rand = new Random(randomSeed);
-
-    testSerializability = options.isTestSerializability();
-    testEncodability = options.isTestEncodability();
-    testUnorderedness = options.isTestUnorderedness();
-  }
-
-  /**
-   * Get the options used in this {@link Pipeline}.
-   */
-  public DirectPipelineOptions getPipelineOptions() {
-    return options;
-  }
-
-  @Override
-  public String toString() {
-    return "DirectPipelineRunner#" + hashCode();
-  }
-
-  public static <K, V> void evaluateGroupByKeyOnly(
-      GroupByKeyOnly<K, V> transform,
-      EvaluationContext context) {
-    PCollection<KV<K, V>> input = context.getInput(transform);
-
-    List<ValueWithMetadata<KV<K, V>>> inputElems =
-        context.getPCollectionValuesWithMetadata(input);
-
-    Coder<K> keyCoder = GroupByKey.getKeyCoder(input.getCoder());
-
-    Map<GroupingKey<K>, List<V>> groupingMap = new HashMap<>();
-
-    for (ValueWithMetadata<KV<K, V>> elem : inputElems) {
-      K key = elem.getValue().getKey();
-      V value = elem.getValue().getValue();
-      byte[] encodedKey;
-      try {
-        encodedKey = encodeToByteArray(keyCoder, key);
-      } catch (CoderException exn) {
-        // TODO: Put in better element printing:
-        // truncate if too long.
-        throw new IllegalArgumentException(
-            "unable to encode key " + key + " of input to " + transform +
-            " using " + keyCoder,
-            exn);
-      }
-      GroupingKey<K> groupingKey =
-          new GroupingKey<>(key, encodedKey);
-      List<V> values = groupingMap.get(groupingKey);
-      if (values == null) {
-        values = new ArrayList<V>();
-        groupingMap.put(groupingKey, values);
-      }
-      values.add(value);
-    }
-
-    List<ValueWithMetadata<KV<K, Iterable<V>>>> outputElems =
-        new ArrayList<>();
-    for (Map.Entry<GroupingKey<K>, List<V>> entry : groupingMap.entrySet()) {
-      GroupingKey<K> groupingKey = entry.getKey();
-      K key = groupingKey.getKey();
-      List<V> values = entry.getValue();
-      values = context.randomizeIfUnordered(values, true /* inPlaceAllowed */);
-      outputElems.add(ValueWithMetadata
-                      .of(WindowedValue.valueInEmptyWindows(KV.<K, Iterable<V>>of(key, values)))
-                      .withKey(key));
-    }
-
-    context.setPCollectionValuesWithMetadata(context.getOutput(transform),
-                                             outputElems);
-  }
-
-  @SuppressWarnings({"rawtypes", "unchecked"})
-  public
-  static <K, V> void registerGroupByKeyOnly() {
-    registerDefaultTransformEvaluator(
-        GroupByKeyOnly.class,
-        new TransformEvaluator<GroupByKeyOnly>() {
-          @Override
-          public void evaluate(
-              GroupByKeyOnly transform,
-              EvaluationContext context) {
-            evaluateGroupByKeyOnly(transform, context);
-          }
-        });
-  }
-
-  static {
-    registerGroupByKeyOnly();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/PipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/PipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/PipelineRunner.java
deleted file mode 100644
index 7f25183..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/PipelineRunner.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.options.GcsOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
-import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.common.base.Preconditions;
-
-/**
- * A {@link PipelineRunner} can execute, translate, or otherwise process a
- * {@link Pipeline}.
- *
- * @param <ResultT> the type of the result of {@link #run}.
- */
-public abstract class PipelineRunner<ResultT extends PipelineResult> {
-
-  /**
-   * Constructs a runner from the provided options.
-   *
-   * @return The newly created runner.
-   */
-  public static PipelineRunner<? extends PipelineResult> fromOptions(PipelineOptions options) {
-    GcsOptions gcsOptions = PipelineOptionsValidator.validate(GcsOptions.class, options);
-    Preconditions.checkNotNull(options);
-
-    // (Re-)register standard IO factories. Clobbers any prior credentials.
-    IOChannelUtils.registerStandardIOFactories(gcsOptions);
-
-    @SuppressWarnings("unchecked")
-    PipelineRunner<? extends PipelineResult> result =
-        InstanceBuilder.ofType(PipelineRunner.class)
-        .fromClass(options.getRunner())
-        .fromFactoryMethod("fromOptions")
-        .withArg(PipelineOptions.class, options)
-        .build();
-    return result;
-  }
-
-  /**
-   * Processes the given Pipeline, returning the results.
-   */
-  public abstract ResultT run(Pipeline pipeline);
-
-  /**
-   * Applies a transform to the given input, returning the output.
-   *
-   * <p>The default implementation calls PTransform.apply(input), but can be overridden
-   * to customize behavior for a particular runner.
-   */
-  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
-      PTransform<InputT, OutputT> transform, InputT input) {
-    return transform.apply(input);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/PipelineRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/PipelineRunnerRegistrar.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/PipelineRunnerRegistrar.java
deleted file mode 100644
index 1acd71f..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/PipelineRunnerRegistrar.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.auto.service.AutoService;
-import java.util.ServiceLoader;
-
-/**
- * {@link PipelineRunner} creators have the ability to automatically have their
- * {@link PipelineRunner} registered with this SDK by creating a {@link ServiceLoader} entry
- * and a concrete implementation of this interface.
- *
- * <p>Note that automatic registration of any
- * {@link com.google.cloud.dataflow.sdk.options.PipelineOptions} requires users
- * conform to the limit that each {@link PipelineRunner}'s
- * {@link Class#getSimpleName() simple name} must be unique.
- *
- * <p>It is optional but recommended to use one of the many build time tools such as
- * {@link AutoService} to generate the necessary META-INF files automatically.
- */
-public interface PipelineRunnerRegistrar {
-  /**
-   * Get the set of {@link PipelineRunner PipelineRunners} to register.
-   */
-  public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/RecordingPipelineVisitor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/RecordingPipelineVisitor.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/RecordingPipelineVisitor.java
deleted file mode 100644
index ee9e8df..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/RecordingPipelineVisitor.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.PValue;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Provides a simple {@link com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor}
- * that records the transformation tree.
- *
- * <p>Provided for internal unit tests.
- */
-public class RecordingPipelineVisitor implements Pipeline.PipelineVisitor {
-
-  public final List<PTransform<?, ?>> transforms = new ArrayList<>();
-  public final List<PValue> values = new ArrayList<>();
-
-  @Override
-  public void enterCompositeTransform(TransformTreeNode node) {
-  }
-
-  @Override
-  public void leaveCompositeTransform(TransformTreeNode node) {
-  }
-
-  @Override
-  public void visitTransform(TransformTreeNode node) {
-    transforms.add(node.getTransform());
-  }
-
-  @Override
-  public void visitValue(PValue value, TransformTreeNode producer) {
-    values.add(value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/TransformHierarchy.java
deleted file mode 100644
index 23687c4..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/TransformHierarchy.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.common.base.Preconditions;
-
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Captures information about a collection of transformations and their
- * associated {@link PValue}s.
- */
-public class TransformHierarchy {
-  private final Deque<TransformTreeNode> transformStack = new LinkedList<>();
-  private final Map<PInput, TransformTreeNode> producingTransformNode = new HashMap<>();
-
-  /**
-   * Create a {@code TransformHierarchy} containing a root node.
-   */
-  public TransformHierarchy() {
-    // First element in the stack is the root node, holding all child nodes.
-    transformStack.add(new TransformTreeNode(null, null, "", null));
-  }
-
-  /**
-   * Returns the last TransformTreeNode on the stack.
-   */
-  public TransformTreeNode getCurrent() {
-    return transformStack.peek();
-  }
-
-  /**
-   * Add a TransformTreeNode to the stack.
-   */
-  public void pushNode(TransformTreeNode current) {
-    transformStack.push(current);
-  }
-
-  /**
-   * Removes the last TransformTreeNode from the stack.
-   */
-  public void popNode() {
-    transformStack.pop();
-    Preconditions.checkState(!transformStack.isEmpty());
-  }
-
-  /**
-   * Adds an input to the given node.
-   *
-   * <p>This forces the producing node to be finished.
-   */
-  public void addInput(TransformTreeNode node, PInput input) {
-    for (PValue i : input.expand()) {
-      TransformTreeNode producer = producingTransformNode.get(i);
-      if (producer == null) {
-        throw new IllegalStateException("Producer unknown for input: " + i);
-      }
-
-      producer.finishSpecifying();
-      node.addInputProducer(i, producer);
-    }
-  }
-
-  /**
-   * Sets the output of a transform node.
-   */
-  public void setOutput(TransformTreeNode producer, POutput output) {
-    producer.setOutput(output);
-
-    for (PValue o : output.expand()) {
-      producingTransformNode.put(o, producer);
-    }
-  }
-
-  /**
-   * Visits all nodes in the transform hierarchy, in transitive order.
-   */
-  public void visit(Pipeline.PipelineVisitor visitor,
-                    Set<PValue> visitedNodes) {
-    transformStack.peekFirst().visit(visitor, visitedNodes);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/TransformTreeNode.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/TransformTreeNode.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/TransformTreeNode.java
deleted file mode 100644
index 60d308f..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/TransformTreeNode.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.common.base.Preconditions;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-/**
- * Provides internal tracking of transform relationships with helper methods
- * for initialization and ordered visitation.
- */
-public class TransformTreeNode {
-  private final TransformTreeNode enclosingNode;
-
-  // The PTransform for this node, which may be a composite PTransform.
-  // The root of a TransformHierarchy is represented as a TransformTreeNode
-  // with a null transform field.
-  private final PTransform<?, ?> transform;
-
-  private final String fullName;
-
-  // Nodes for sub-transforms of a composite transform.
-  private final Collection<TransformTreeNode> parts = new ArrayList<>();
-
-  // Inputs to the transform, in expanded form and mapped to the producer
-  // of the input.
-  private final Map<PValue, TransformTreeNode> inputs = new HashMap<>();
-
-  // Input to the transform, in unexpanded form.
-  private final PInput input;
-
-  // TODO: track which outputs need to be exported to parent.
-  // Output of the transform, in unexpanded form.
-  private POutput output;
-
-  private boolean finishedSpecifying = false;
-
-  /**
-   * Creates a new TransformTreeNode with the given parent and transform.
-   *
-   * <p>EnclosingNode and transform may both be null for
-   * a root-level node, which holds all other nodes.
-   *
-   * @param enclosingNode the composite node containing this node
-   * @param transform the PTransform tracked by this node
-   * @param fullName the fully qualified name of the transform
-   * @param input the unexpanded input to the transform
-   */
-  public TransformTreeNode(@Nullable TransformTreeNode enclosingNode,
-                           @Nullable PTransform<?, ?> transform,
-                           String fullName,
-                           @Nullable PInput input) {
-    this.enclosingNode = enclosingNode;
-    this.transform = transform;
-    Preconditions.checkArgument((enclosingNode == null && transform == null)
-        || (enclosingNode != null && transform != null),
-        "EnclosingNode and transform must both be specified, or both be null");
-    this.fullName = fullName;
-    this.input = input;
-  }
-
-  /**
-   * Returns the transform associated with this transform node.
-   */
-  public PTransform<?, ?> getTransform() {
-    return transform;
-  }
-
-  /**
-   * Returns the enclosing composite transform node, or null if there is none.
-   */
-  public TransformTreeNode getEnclosingNode() {
-    return enclosingNode;
-  }
-
-  /**
-   * Adds a composite operation to the transform node.
-   *
-   * <p>As soon as a node is added, the transform node is considered a
-   * composite operation instead of a primitive transform.
-   */
-  public void addComposite(TransformTreeNode node) {
-    parts.add(node);
-  }
-
-  /**
-   * Returns true if this node represents a composite transform that does not perform
-   * processing of its own, but merely encapsulates a sub-pipeline (which may be empty).
-   *
-   * <p>Note that a node may be composite with no sub-transforms if it  returns its input directly
-   * extracts a component of a tuple, or other operations that occur at pipeline assembly time.
-   */
-  public boolean isCompositeNode() {
-    return !parts.isEmpty() || returnsOthersOutput() || isRootNode();
-  }
-
-  private boolean returnsOthersOutput() {
-    PTransform<?, ?> transform = getTransform();
-    for (PValue output : getExpandedOutputs()) {
-      if (!output.getProducingTransformInternal().getTransform().equals(transform)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  public boolean isRootNode() {
-    return transform == null;
-  }
-
-  public String getFullName() {
-    return fullName;
-  }
-
-  /**
-   * Adds an input to the transform node.
-   */
-  public void addInputProducer(PValue expandedInput, TransformTreeNode producer) {
-    Preconditions.checkState(!finishedSpecifying);
-    inputs.put(expandedInput, producer);
-  }
-
-  /**
-   * Returns the transform input, in unexpanded form.
-   */
-  public PInput getInput() {
-    return input;
-  }
-
-  /**
-   * Returns a mapping of inputs to the producing nodes for all inputs to
-   * the transform.
-   */
-  public Map<PValue, TransformTreeNode> getInputs() {
-    return Collections.unmodifiableMap(inputs);
-  }
-
-  /**
-   * Adds an output to the transform node.
-   */
-  public void setOutput(POutput output) {
-    Preconditions.checkState(!finishedSpecifying);
-    Preconditions.checkState(this.output == null);
-    this.output = output;
-  }
-
-  /**
-   * Returns the transform output, in unexpanded form.
-   */
-  public POutput getOutput() {
-    return output;
-  }
-
-  /**
-   * Returns the transform outputs, in expanded form.
-   */
-  public Collection<? extends PValue> getExpandedOutputs() {
-    if (output != null) {
-      return output.expand();
-    } else {
-      return Collections.emptyList();
-    }
-  }
-
-  /**
-   * Visit the transform node.
-   *
-   * <p>Provides an ordered visit of the input values, the primitive
-   * transform (or child nodes for composite transforms), then the
-   * output values.
-   */
-  public void visit(Pipeline.PipelineVisitor visitor,
-                    Set<PValue> visitedValues) {
-    if (!finishedSpecifying) {
-      finishSpecifying();
-    }
-
-    // Visit inputs.
-    for (Map.Entry<PValue, TransformTreeNode> entry : inputs.entrySet()) {
-      if (visitedValues.add(entry.getKey())) {
-        visitor.visitValue(entry.getKey(), entry.getValue());
-      }
-    }
-
-    if (isCompositeNode()) {
-      visitor.enterCompositeTransform(this);
-      for (TransformTreeNode child : parts) {
-        child.visit(visitor, visitedValues);
-      }
-      visitor.leaveCompositeTransform(this);
-    } else {
-      visitor.visitTransform(this);
-    }
-
-    // Visit outputs.
-    for (PValue pValue : getExpandedOutputs()) {
-      if (visitedValues.add(pValue)) {
-        visitor.visitValue(pValue, this);
-      }
-    }
-  }
-
-  /**
-   * Finish specifying a transform.
-   *
-   * <p>All inputs are finished first, then the transform, then
-   * all outputs.
-   */
-  public void finishSpecifying() {
-    if (finishedSpecifying) {
-      return;
-    }
-    finishedSpecifying = true;
-
-    for (TransformTreeNode input : inputs.values()) {
-      if (input != null) {
-        input.finishSpecifying();
-      }
-    }
-
-    if (output != null) {
-      output.finishSpecifyingOutput();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java
deleted file mode 100644
index 6342725..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-
-/**
- * An abstract {@link ModelEnforcement} that provides default empty implementations for each method.
- */
-abstract class AbstractModelEnforcement<T> implements ModelEnforcement<T> {
-  @Override
-  public void beforeElement(WindowedValue<T> element) {}
-
-  @Override
-  public void afterElement(WindowedValue<T> element) {}
-
-  @Override
-  public void afterFinish(
-      CommittedBundle<T> input,
-      InProcessTransformResult result,
-      Iterable<? extends CommittedBundle<?>> outputs) {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AvroIOShardedWriteFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AvroIOShardedWriteFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AvroIOShardedWriteFactory.java
deleted file mode 100644
index 49576e5..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AvroIOShardedWriteFactory.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.io.AvroIO;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PDone;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-
-class AvroIOShardedWriteFactory implements PTransformOverrideFactory {
-  @Override
-  public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
-      PTransform<InputT, OutputT> transform) {
-    if (transform instanceof AvroIO.Write.Bound) {
-      @SuppressWarnings("unchecked")
-      AvroIO.Write.Bound<InputT> originalWrite = (AvroIO.Write.Bound<InputT>) transform;
-      if (originalWrite.getNumShards() > 1
-          || (originalWrite.getNumShards() == 1
-              && !"".equals(originalWrite.getShardNameTemplate()))) {
-        @SuppressWarnings("unchecked")
-        PTransform<InputT, OutputT> override =
-            (PTransform<InputT, OutputT>) new AvroIOShardedWrite<InputT>(originalWrite);
-        return override;
-      }
-    }
-    return transform;
-  }
-
-  private class AvroIOShardedWrite<InputT> extends ShardControlledWrite<InputT> {
-    private final AvroIO.Write.Bound<InputT> initial;
-
-    private AvroIOShardedWrite(AvroIO.Write.Bound<InputT> initial) {
-      this.initial = initial;
-    }
-
-    @Override
-    int getNumShards() {
-      return initial.getNumShards();
-    }
-
-    @Override
-    PTransform<? super PCollection<InputT>, PDone> getSingleShardTransform(int shardNum) {
-      String shardName =
-          IOChannelUtils.constructName(
-              initial.getFilenamePrefix(),
-              initial.getShardNameTemplate(),
-              initial.getFilenameSuffix(),
-              shardNum,
-              getNumShards());
-      return initial.withoutSharding().to(shardName).withSuffix("");
-    }
-
-    @Override
-    protected PTransform<PCollection<InputT>, PDone> delegate() {
-      return initial;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
deleted file mode 100644
index f034e2f..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.io.BoundedSource;
-import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader;
-import com.google.cloud.dataflow.sdk.io.Read.Bounded;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import java.io.IOException;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators}
- * for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
- */
-final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
-  /*
-   * An evaluator for a Source is stateful, to ensure data is not read multiple times.
-   * Evaluators are cached here to ensure that the reader is not restarted if the evaluator is
-   * retriggered.
-   */
-  private final ConcurrentMap<EvaluatorKey, Queue<? extends BoundedReadEvaluator<?>>>
-      sourceEvaluators = new ConcurrentHashMap<>();
-
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  @Override
-  public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application,
-      @Nullable CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext)
-      throws IOException {
-    return getTransformEvaluator((AppliedPTransform) application, evaluationContext);
-  }
-
-  private <OutputT> TransformEvaluator<?> getTransformEvaluator(
-      final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
-      final InProcessEvaluationContext evaluationContext) {
-    BoundedReadEvaluator<?> evaluator =
-        getTransformEvaluatorQueue(transform, evaluationContext).poll();
-    if (evaluator == null) {
-      return EmptyTransformEvaluator.create(transform);
-    }
-    return evaluator;
-  }
-
-  /**
-   * Get the queue of {@link TransformEvaluator TransformEvaluators} that produce elements for the
-   * provided application of {@link Bounded Read.Bounded}, initializing it if required.
-   *
-   * <p>This method is thread-safe, and will only produce new evaluators if no other invocation has
-   * already done so.
-   */
-  @SuppressWarnings("unchecked")
-  private <OutputT> Queue<BoundedReadEvaluator<OutputT>> getTransformEvaluatorQueue(
-      final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
-      final InProcessEvaluationContext evaluationContext) {
-    // Key by the application and the context the evaluation is occurring in (which call to
-    // Pipeline#run).
-    EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
-    Queue<BoundedReadEvaluator<OutputT>> evaluatorQueue =
-        (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
-    if (evaluatorQueue == null) {
-      evaluatorQueue = new ConcurrentLinkedQueue<>();
-      if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
-        // If no queue existed in the evaluators, add an evaluator to initialize the evaluator
-        // factory for this transform
-        BoundedSource<OutputT> source = transform.getTransform().getSource();
-        BoundedReadEvaluator<OutputT> evaluator =
-            new BoundedReadEvaluator<OutputT>(transform, evaluationContext, source);
-        evaluatorQueue.offer(evaluator);
-      } else {
-        // otherwise return the existing Queue that arrived before us
-        evaluatorQueue = (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
-      }
-    }
-    return evaluatorQueue;
-  }
-
-  /**
-   * A {@link BoundedReadEvaluator} produces elements from an underlying {@link BoundedSource},
-   * discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator
-   * creates the {@link BoundedReader} and consumes all available input.
-   *
-   * <p>A {@link BoundedReadEvaluator} should only be created once per {@link BoundedSource}, and
-   * each evaluator should only be called once per evaluation of the pipeline. Otherwise, the source
-   * may produce duplicate elements.
-   */
-  private static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
-    private final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform;
-    private final InProcessEvaluationContext evaluationContext;
-    /**
-     * The source being read from by this {@link BoundedReadEvaluator}. This may not be the same
-     * as the source derived from {@link #transform} due to splitting.
-     */
-    private BoundedSource<OutputT> source;
-
-    public BoundedReadEvaluator(
-        AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
-        InProcessEvaluationContext evaluationContext,
-        BoundedSource<OutputT> source) {
-      this.transform = transform;
-      this.evaluationContext = evaluationContext;
-      this.source = source;
-    }
-
-    @Override
-    public void processElement(WindowedValue<Object> element) {}
-
-    @Override
-    public InProcessTransformResult finishBundle() throws IOException {
-      try (final BoundedReader<OutputT> reader =
-              source.createReader(evaluationContext.getPipelineOptions());) {
-        boolean contentsRemaining = reader.start();
-        UncommittedBundle<OutputT> output =
-            evaluationContext.createRootBundle(transform.getOutput());
-        while (contentsRemaining) {
-          output.add(
-              WindowedValue.timestampedValueInGlobalWindow(
-                  reader.getCurrent(), reader.getCurrentTimestamp()));
-          contentsRemaining = reader.advance();
-        }
-        reader.close();
-        return StepTransformResult.withHold(transform, BoundedWindow.TIMESTAMP_MAX_VALUE)
-            .addOutput(output)
-            .build();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BundleFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BundleFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BundleFactory.java
deleted file mode 100644
index cb8a369..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BundleFactory.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-/**
- * A factory that creates {@link UncommittedBundle UncommittedBundles}.
- */
-public interface BundleFactory {
-  /**
-   * Create an {@link UncommittedBundle} from an empty input. Elements added to the bundle belong to
-   * the {@code output} {@link PCollection}.
-   */
-  public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output);
-
-  /**
-   * Create an {@link UncommittedBundle} from the specified input. Elements added to the bundle
-   * belong to the {@code output} {@link PCollection}.
-   */
-  public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output);
-
-  /**
-   * Create an {@link UncommittedBundle} with the specified keys at the specified step. For use by
-   * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}. Elements added to the bundle
-   * belong to the {@code output} {@link PCollection}.
-   */
-  public <T> UncommittedBundle<T> createKeyedBundle(
-      CommittedBundle<?> input, Object key, PCollection<T> output);
-}
-