You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:47 UTC

[23/67] [partial] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Create.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Create.java
deleted file mode 100644
index a74e5bf..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Create.java
+++ /dev/null
@@ -1,426 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
-import com.google.cloud.dataflow.sdk.coders.VoidCoder;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.TimestampedValue;
-import com.google.cloud.dataflow.sdk.values.TimestampedValue.TimestampedValueCoder;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-
-import org.joda.time.Instant;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * {@code Create<T>} takes a collection of elements of type {@code T}
- * known when the pipeline is constructed and returns a
- * {@code PCollection<T>} containing the elements.
- *
- * <p>Example of use:
- * <pre> {@code
- * Pipeline p = ...;
- *
- * PCollection<Integer> pc = p.apply(Create.of(3, 4, 5).withCoder(BigEndianIntegerCoder.of()));
- *
- * Map<String, Integer> map = ...;
- * PCollection<KV<String, Integer>> pt =
- *     p.apply(Create.of(map)
- *      .withCoder(KvCoder.of(StringUtf8Coder.of(),
- *                            BigEndianIntegerCoder.of())));
- * } </pre>
- *
- * <p>{@code Create} can automatically determine the {@code Coder} to use
- * if all elements have the same run-time class, and a default coder is registered for that
- * class. See {@link CoderRegistry} for details on how defaults are determined.
- *
- * <p>If a coder can not be inferred, {@link Create.Values#withCoder} must be called
- * explicitly to set the encoding of the resulting
- * {@code PCollection}.
- *
- * <p>A good use for {@code Create} is when a {@code PCollection}
- * needs to be created without dependencies on files or other external
- * entities.  This is especially useful during testing.
- *
- * <p>Caveat: {@code Create} only supports small in-memory datasets,
- * particularly when submitting jobs to the Google Cloud Dataflow
- * service.
- *
- * @param <T> the type of the elements of the resulting {@code PCollection}
- */
-public class Create<T> {
-  /**
-   * Returns a new {@code Create.Values} transform that produces a
-   * {@link PCollection} containing elements of the provided
-   * {@code Iterable}.
-   *
-   * <p>The argument should not be modified after this is called.
-   *
-   * <p>The elements of the output {@link PCollection} will have a timestamp of negative infinity,
-   * see {@link Create#timestamped} for a way of creating a {@code PCollection} with timestamped
-   * elements.
-   *
-   * <p>By default, {@code Create.Values} can automatically determine the {@code Coder} to use
-   * if all elements have the same non-parameterized run-time class, and a default coder is
-   * registered for that class. See {@link CoderRegistry} for details on how defaults are
-   * determined.
-   * Otherwise, use {@link Create.Values#withCoder} to set the coder explicitly.
-   */
-  public static <T> Values<T> of(Iterable<T> elems) {
-    return new Values<>(elems, Optional.<Coder<T>>absent());
-  }
-
-  /**
-   * Returns a new {@code Create.Values} transform that produces a
-   * {@link PCollection} containing the specified elements.
-   *
-   * <p>The elements will have a timestamp of negative infinity, see
-   * {@link Create#timestamped} for a way of creating a {@code PCollection}
-   * with timestamped elements.
-   *
-   * <p>The arguments should not be modified after this is called.
-   *
-   * <p>By default, {@code Create.Values} can automatically determine the {@code Coder} to use
-   * if all elements have the same non-parameterized run-time class, and a default coder is
-   * registered for that class. See {@link CoderRegistry} for details on how defaults are
-   * determined.
-   * Otherwise, use {@link Create.Values#withCoder} to set the coder explicitly.
-   */
-  @SafeVarargs
-  public static <T> Values<T> of(T... elems) {
-    return of(Arrays.asList(elems));
-  }
-
-  /**
-   * Returns a new {@code Create.Values} transform that produces a
-   * {@link PCollection} of {@link KV}s corresponding to the keys and
-   * values of the specified {@code Map}.
-   *
-   * <p>The elements will have a timestamp of negative infinity, see
-   * {@link Create#timestamped} for a way of creating a {@code PCollection}
-   * with timestamped elements.
-   *
-   * <p>By default, {@code Create.Values} can automatically determine the {@code Coder} to use
-   * if all elements have the same non-parameterized run-time class, and a default coder is
-   * registered for that class. See {@link CoderRegistry} for details on how defaults are
-   * determined.
-   * Otherwise, use {@link Create.Values#withCoder} to set the coder explicitly.
-   */
-  public static <K, V> Values<KV<K, V>> of(Map<K, V> elems) {
-    List<KV<K, V>> kvs = new ArrayList<>(elems.size());
-    for (Map.Entry<K, V> entry : elems.entrySet()) {
-      kvs.add(KV.of(entry.getKey(), entry.getValue()));
-    }
-    return of(kvs);
-  }
-
-  /**
-   * Returns a new {@link Create.TimestampedValues} transform that produces a
-   * {@link PCollection} containing the elements of the provided {@code Iterable}
-   * with the specified timestamps.
-   *
-   * <p>The argument should not be modified after this is called.
-   *
-   * <p>By default, {@code Create.TimestampedValues} can automatically determine the {@code Coder}
-   * to use if all elements have the same non-parameterized run-time class, and a default coder is
-   * registered for that class. See {@link CoderRegistry} for details on how defaults are
-   * determined.
-   * Otherwise, use {@link Create.TimestampedValues#withCoder} to set the coder explicitly.
-   */
-  public static <T> TimestampedValues<T> timestamped(Iterable<TimestampedValue<T>> elems) {
-    return new TimestampedValues<>(elems, Optional.<Coder<T>>absent());
-  }
-
-  /**
-   * Returns a new {@link Create.TimestampedValues} transform that produces a {@link PCollection}
-   * containing the specified elements with the specified timestamps.
-   *
-   * <p>The arguments should not be modified after this is called.
-   */
-  @SafeVarargs
-  public static <T> TimestampedValues<T> timestamped(
-      @SuppressWarnings("unchecked") TimestampedValue<T>... elems) {
-    return timestamped(Arrays.asList(elems));
-  }
-
-  /**
-   * Returns a new root transform that produces a {@link PCollection} containing
-   * the specified elements with the specified timestamps.
-   *
-   * <p>The arguments should not be modified after this is called.
-   *
-   * <p>By default, {@code Create.TimestampedValues} can automatically determine the {@code Coder}
-   * to use if all elements have the same non-parameterized run-time class, and a default coder
-   * is registered for that class. See {@link CoderRegistry} for details on how defaults are
-   * determined.
-   * Otherwise, use {@link Create.TimestampedValues#withCoder} to set the coder explicitly.
-
-   * @throws IllegalArgumentException if there are a different number of values
-   * and timestamps
-   */
-  public static <T> TimestampedValues<T> timestamped(
-      Iterable<T> values, Iterable<Long> timestamps) {
-    List<TimestampedValue<T>> elems = new ArrayList<>();
-    Iterator<T> valueIter = values.iterator();
-    Iterator<Long> timestampIter = timestamps.iterator();
-    while (valueIter.hasNext() && timestampIter.hasNext()) {
-      elems.add(TimestampedValue.of(valueIter.next(), new Instant(timestampIter.next())));
-    }
-    Preconditions.checkArgument(
-        !valueIter.hasNext() && !timestampIter.hasNext(),
-        "Expect sizes of values and timestamps are same.");
-    return timestamped(elems);
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * A {@code PTransform} that creates a {@code PCollection} from a set of in-memory objects.
-   */
-  public static class Values<T> extends PTransform<PInput, PCollection<T>> {
-    /**
-     * Returns a {@link Create.Values} PTransform like this one that uses the given
-     * {@code Coder<T>} to decode each of the objects into a
-     * value of type {@code T}.
-     *
-     * <p>By default, {@code Create.Values} can automatically determine the {@code Coder} to use
-     * if all elements have the same non-parameterized run-time class, and a default coder is
-     * registered for that class. See {@link CoderRegistry} for details on how defaults are
-     * determined.
-     *
-     * <p>Note that for {@link Create.Values} with no elements, the {@link VoidCoder} is used.
-     */
-    public Values<T> withCoder(Coder<T> coder) {
-      return new Values<>(elems, Optional.of(coder));
-    }
-
-    public Iterable<T> getElements() {
-      return elems;
-    }
-
-    @Override
-    public PCollection<T> apply(PInput input) {
-      try {
-        Coder<T> coder = getDefaultOutputCoder(input);
-        return PCollection
-            .<T>createPrimitiveOutputInternal(
-                input.getPipeline(),
-                WindowingStrategy.globalDefault(),
-                IsBounded.BOUNDED)
-            .setCoder(coder);
-      } catch (CannotProvideCoderException e) {
-        throw new IllegalArgumentException("Unable to infer a coder and no Coder was specified. "
-            + "Please set a coder by invoking Create.withCoder() explicitly.", e);
-      }
-    }
-
-    @Override
-    public Coder<T> getDefaultOutputCoder(PInput input) throws CannotProvideCoderException {
-      if (coder.isPresent()) {
-        return coder.get();
-      }
-      // First try to deduce a coder using the types of the elements.
-      Class<?> elementClazz = Void.class;
-      for (T elem : elems) {
-        if (elem == null) {
-          continue;
-        }
-        Class<?> clazz = elem.getClass();
-        if (elementClazz.equals(Void.class)) {
-          elementClazz = clazz;
-        } else if (!elementClazz.equals(clazz)) {
-          // Elements are not the same type, require a user-specified coder.
-          throw new CannotProvideCoderException(
-              "Cannot provide coder for Create: The elements are not all of the same class.");
-        }
-      }
-
-      if (elementClazz.getTypeParameters().length == 0) {
-        try {
-          @SuppressWarnings("unchecked") // elementClazz is a wildcard type
-          Coder<T> coder = (Coder<T>) input.getPipeline().getCoderRegistry()
-              .getDefaultCoder(TypeDescriptor.of(elementClazz));
-          return coder;
-        } catch (CannotProvideCoderException exc) {
-          // let the next stage try
-        }
-      }
-
-      // If that fails, try to deduce a coder using the elements themselves
-      Optional<Coder<T>> coder = Optional.absent();
-      for (T elem : elems) {
-        Coder<T> c = input.getPipeline().getCoderRegistry().getDefaultCoder(elem);
-        if (!coder.isPresent()) {
-          coder = Optional.of(c);
-        } else if (!Objects.equals(c, coder.get())) {
-          throw new CannotProvideCoderException(
-              "Cannot provide coder for elements of " + Create.class.getSimpleName() + ":"
-              + " For their common class, no coder could be provided."
-              + " Based on their values, they do not all default to the same Coder.");
-        }
-      }
-
-      if (!coder.isPresent()) {
-        throw new CannotProvideCoderException("Unable to infer a coder. Please register "
-            + "a coder for ");
-      }
-      return coder.get();
-    }
-
-    /////////////////////////////////////////////////////////////////////////////
-
-    /** The elements of the resulting PCollection. */
-    private final transient Iterable<T> elems;
-
-    /** The coder used to encode the values to and from a binary representation. */
-    private final transient Optional<Coder<T>> coder;
-
-    /**
-     * Constructs a {@code Create.Values} transform that produces a
-     * {@link PCollection} containing the specified elements.
-     *
-     * <p>The arguments should not be modified after this is called.
-     */
-    private Values(Iterable<T> elems, Optional<Coder<T>> coder) {
-      this.elems = elems;
-      this.coder = coder;
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * A {@code PTransform} that creates a {@code PCollection} whose elements have
-   * associated timestamps.
-   */
-  public static class TimestampedValues<T> extends Values<T> {
-    /**
-     * Returns a {@link Create.TimestampedValues} PTransform like this one that uses the given
-     * {@code Coder<T>} to decode each of the objects into a
-     * value of type {@code T}.
-     *
-     * <p>By default, {@code Create.TimestampedValues} can automatically determine the
-     * {@code Coder} to use if all elements have the same non-parameterized run-time class,
-     * and a default coder is registered for that class. See {@link CoderRegistry} for details
-     * on how defaults are determined.
-     *
-     * <p>Note that for {@link Create.TimestampedValues with no elements}, the {@link VoidCoder}
-     * is used.
-     */
-    @Override
-    public TimestampedValues<T> withCoder(Coder<T> coder) {
-      return new TimestampedValues<>(elems, Optional.<Coder<T>>of(coder));
-    }
-
-    @Override
-    public PCollection<T> apply(PInput input) {
-      try {
-        Coder<T> coder = getDefaultOutputCoder(input);
-        PCollection<TimestampedValue<T>> intermediate = Pipeline.applyTransform(input,
-            Create.of(elems).withCoder(TimestampedValueCoder.of(coder)));
-
-        PCollection<T> output = intermediate.apply(ParDo.of(new ConvertTimestamps<T>()));
-        output.setCoder(coder);
-        return output;
-      } catch (CannotProvideCoderException e) {
-        throw new IllegalArgumentException("Unable to infer a coder and no Coder was specified. "
-            + "Please set a coder by invoking CreateTimestamped.withCoder() explicitly.", e);
-      }
-    }
-
-    /////////////////////////////////////////////////////////////////////////////
-
-    /** The timestamped elements of the resulting PCollection. */
-    private final transient Iterable<TimestampedValue<T>> elems;
-
-    private TimestampedValues(Iterable<TimestampedValue<T>> elems,
-        Optional<Coder<T>> coder) {
-      super(
-          Iterables.transform(elems, new Function<TimestampedValue<T>, T>() {
-            @Override
-            public T apply(TimestampedValue<T> input) {
-              return input.getValue();
-            }
-          }), coder);
-      this.elems = elems;
-    }
-
-    private static class ConvertTimestamps<T> extends DoFn<TimestampedValue<T>, T> {
-      @Override
-      public void processElement(ProcessContext c) {
-        c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp());
-      }
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  static {
-    registerDefaultTransformEvaluator();
-  }
-
-  @SuppressWarnings({"rawtypes", "unchecked"})
-  private static void registerDefaultTransformEvaluator() {
-    DirectPipelineRunner.registerDefaultTransformEvaluator(
-        Create.Values.class,
-        new DirectPipelineRunner.TransformEvaluator<Create.Values>() {
-          @Override
-          public void evaluate(
-              Create.Values transform,
-              DirectPipelineRunner.EvaluationContext context) {
-            evaluateHelper(transform, context);
-          }
-        });
-  }
-
-  private static <T> void evaluateHelper(
-      Create.Values<T> transform,
-      DirectPipelineRunner.EvaluationContext context) {
-    // Convert the Iterable of elems into a List of elems.
-    List<T> listElems;
-    if (transform.elems instanceof Collection) {
-      Collection<T> collectionElems = (Collection<T>) transform.elems;
-      listElems = new ArrayList<>(collectionElems.size());
-    } else {
-      listElems = new ArrayList<>();
-    }
-    for (T elem : transform.elems) {
-      listElems.add(
-          context.ensureElementEncodable(context.getOutput(transform), elem));
-    }
-    context.setPCollection(context.getOutput(transform), listElems);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFn.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFn.java
deleted file mode 100644
index 5ba9992..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFn.java
+++ /dev/null
@@ -1,563 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
-import com.google.cloud.dataflow.sdk.transforms.display.HasDisplayData;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.util.WindowingInternals;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.base.MoreObjects;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.UUID;
-
-/**
- * The argument to {@link ParDo} providing the code to use to process
- * elements of the input
- * {@link com.google.cloud.dataflow.sdk.values.PCollection}.
- *
- * <p>See {@link ParDo} for more explanation, examples of use, and
- * discussion of constraints on {@code DoFn}s, including their
- * serializability, lack of access to global shared mutable state,
- * requirements for failure tolerance, and benefits of optimization.
- *
- * <p>{@code DoFn}s can be tested in the context of a particular
- * {@code Pipeline} by running that {@code Pipeline} on sample input
- * and then checking its output.  Unit testing of a {@code DoFn},
- * separately from any {@code ParDo} transform or {@code Pipeline},
- * can be done via the {@link DoFnTester} harness.
- *
- * <p>{@link DoFnWithContext} (currently experimental) offers an alternative
- * mechanism for accessing {@link ProcessContext#window()} without the need
- * to implement {@link RequiresWindowAccess}.
- *
- * <p>See also {@link #processElement} for details on implementing the transformation
- * from {@code InputT} to {@code OutputT}.
- *
- * @param <InputT> the type of the (main) input elements
- * @param <OutputT> the type of the (main) output elements
- */
-public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayData {
-
-  /**
-   * Information accessible to all methods in this {@code DoFn}.
-   * Used primarily to output elements.
-   */
-  public abstract class Context {
-
-    /**
-     * Returns the {@code PipelineOptions} specified with the
-     * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner}
-     * invoking this {@code DoFn}.  The {@code PipelineOptions} will
-     * be the default running via {@link DoFnTester}.
-     */
-    public abstract PipelineOptions getPipelineOptions();
-
-    /**
-     * Adds the given element to the main output {@code PCollection}.
-     *
-     * <p>Once passed to {@code output} the element should be considered
-     * immutable and not be modified in any way. It may be cached or retained
-     * by the Dataflow runtime or later steps in the pipeline, or used in
-     * other unspecified ways.
-     *
-     * <p>If invoked from {@link DoFn#processElement processElement}, the output
-     * element will have the same timestamp and be in the same windows
-     * as the input element passed to {@link DoFn#processElement processElement}.
-     *
-     * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
-     * this will attempt to use the
-     * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn}
-     * of the input {@code PCollection} to determine what windows the element
-     * should be in, throwing an exception if the {@code WindowFn} attempts
-     * to access any information about the input element. The output element
-     * will have a timestamp of negative infinity.
-     */
-    public abstract void output(OutputT output);
-
-    /**
-     * Adds the given element to the main output {@code PCollection},
-     * with the given timestamp.
-     *
-     * <p>Once passed to {@code outputWithTimestamp} the element should not be
-     * modified in any way.
-     *
-     * <p>If invoked from {@link DoFn#processElement processElement}, the timestamp
-     * must not be older than the input element's timestamp minus
-     * {@link DoFn#getAllowedTimestampSkew getAllowedTimestampSkew}.  The output element will
-     * be in the same windows as the input element.
-     *
-     * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
-     * this will attempt to use the
-     * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn}
-     * of the input {@code PCollection} to determine what windows the element
-     * should be in, throwing an exception if the {@code WindowFn} attempts
-     * to access any information about the input element except for the
-     * timestamp.
-     */
-    public abstract void outputWithTimestamp(OutputT output, Instant timestamp);
-
-    /**
-     * Adds the given element to the side output {@code PCollection} with the
-     * given tag.
-     *
-     * <p>Once passed to {@code sideOutput} the element should not be modified
-     * in any way.
-     *
-     * <p>The caller of {@code ParDo} uses {@link ParDo#withOutputTags withOutputTags} to
-     * specify the tags of side outputs that it consumes. Non-consumed side
-     * outputs, e.g., outputs for monitoring purposes only, don't necessarily
-     * need to be specified.
-     *
-     * <p>The output element will have the same timestamp and be in the same
-     * windows as the input element passed to {@link DoFn#processElement processElement}.
-     *
-     * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
-     * this will attempt to use the
-     * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn}
-     * of the input {@code PCollection} to determine what windows the element
-     * should be in, throwing an exception if the {@code WindowFn} attempts
-     * to access any information about the input element. The output element
-     * will have a timestamp of negative infinity.
-     *
-     * @see ParDo#withOutputTags
-     */
-    public abstract <T> void sideOutput(TupleTag<T> tag, T output);
-
-    /**
-     * Adds the given element to the specified side output {@code PCollection},
-     * with the given timestamp.
-     *
-     * <p>Once passed to {@code sideOutputWithTimestamp} the element should not be
-     * modified in any way.
-     *
-     * <p>If invoked from {@link DoFn#processElement processElement}, the timestamp
-     * must not be older than the input element's timestamp minus
-     * {@link DoFn#getAllowedTimestampSkew getAllowedTimestampSkew}.  The output element will
-     * be in the same windows as the input element.
-     *
-     * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
-     * this will attempt to use the
-     * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn}
-     * of the input {@code PCollection} to determine what windows the element
-     * should be in, throwing an exception if the {@code WindowFn} attempts
-     * to access any information about the input element except for the
-     * timestamp.
-     *
-     * @see ParDo#withOutputTags
-     */
-    public abstract <T> void sideOutputWithTimestamp(
-        TupleTag<T> tag, T output, Instant timestamp);
-
-    /**
-     * Creates an {@link Aggregator} in the {@link DoFn} context with the
-     * specified name and aggregation logic specified by {@link CombineFn}.
-     *
-     * <p>For internal use only.
-     *
-     * @param name the name of the aggregator
-     * @param combiner the {@link CombineFn} to use in the aggregator
-     * @return an aggregator for the provided name and {@link CombineFn} in this
-     *         context
-     */
-    @Experimental(Kind.AGGREGATOR)
-    protected abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
-        createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
-
-    /**
-     * Sets up {@link Aggregator}s created by the {@link DoFn} so they are
-     * usable within this context.
-     *
-     * <p>This method should be called by runners before {@link DoFn#startBundle}
-     * is executed.
-     */
-    @Experimental(Kind.AGGREGATOR)
-    protected final void setupDelegateAggregators() {
-      for (DelegatingAggregator<?, ?> aggregator : aggregators.values()) {
-        setupDelegateAggregator(aggregator);
-      }
-
-      aggregatorsAreFinal = true;
-    }
-
-    private final <AggInputT, AggOutputT> void setupDelegateAggregator(
-        DelegatingAggregator<AggInputT, AggOutputT> aggregator) {
-
-      Aggregator<AggInputT, AggOutputT> delegate = createAggregatorInternal(
-          aggregator.getName(), aggregator.getCombineFn());
-
-      aggregator.setDelegate(delegate);
-    }
-  }
-
-  /**
-   * Information accessible when running {@link DoFn#processElement}.
-   */
-  public abstract class ProcessContext extends Context {
-
-    /**
-     * Returns the input element to be processed.
-     *
-     * <p>The element should be considered immutable. The Dataflow runtime will not mutate the
-     * element, so it is safe to cache, etc. The element should not be mutated by any of the
-     * {@link DoFn} methods, because it may be cached elsewhere, retained by the Dataflow runtime,
-     * or used in other unspecified ways.
-     */
-    public abstract InputT element();
-
-    /**
-     * Returns the value of the side input for the window corresponding to the
-     * window of the main input element.
-     *
-     * <p>See
-     * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn#getSideInputWindow}
-     * for how this corresponding window is determined.
-     *
-     * @throws IllegalArgumentException if this is not a side input
-     * @see ParDo#withSideInputs
-     */
-    public abstract <T> T sideInput(PCollectionView<T> view);
-
-    /**
-     * Returns the timestamp of the input element.
-     *
-     * <p>See {@link com.google.cloud.dataflow.sdk.transforms.windowing.Window}
-     * for more information.
-     */
-    public abstract Instant timestamp();
-
-    /**
-     * Returns the window into which the input element has been assigned.
-     *
-     * <p>See {@link com.google.cloud.dataflow.sdk.transforms.windowing.Window}
-     * for more information.
-     *
-     * @throws UnsupportedOperationException if this {@link DoFn} does
-     * not implement {@link RequiresWindowAccess}.
-     */
-    public abstract BoundedWindow window();
-
-    /**
-     * Returns information about the pane within this window into which the
-     * input element has been assigned.
-     *
-     * <p>Generally all data is in a single, uninteresting pane unless custom
-     * triggering and/or late data has been explicitly requested.
-     * See {@link com.google.cloud.dataflow.sdk.transforms.windowing.Window}
-     * for more information.
-     */
-    public abstract PaneInfo pane();
-
-    /**
-     * Returns the process context to use for implementing windowing.
-     */
-    @Experimental
-    public abstract WindowingInternals<InputT, OutputT> windowingInternals();
-  }
-
-  /**
-   * Returns the allowed timestamp skew duration, which is the maximum
-   * duration that timestamps can be shifted backward in
-   * {@link DoFn.Context#outputWithTimestamp}.
-   *
-   * <p>The default value is {@code Duration.ZERO}, in which case
-   * timestamps can only be shifted forward to future.  For infinite
-   * skew, return {@code Duration.millis(Long.MAX_VALUE)}.
-   *
-   * <p> Note that producing an element whose timestamp is less than the
-   * current timestamp may result in late data, i.e. returning a non-zero
-   * value here does not impact watermark calculations used for firing
-   * windows.
-   *
-   * @deprecated does not interact well with the watermark.
-   */
-  @Deprecated
-  public Duration getAllowedTimestampSkew() {
-    return Duration.ZERO;
-  }
-
-  /**
-   * Interface for signaling that a {@link DoFn} needs to access the window the
-   * element is being processed in, via {@link DoFn.ProcessContext#window}.
-   */
-  @Experimental
-  public interface RequiresWindowAccess {}
-
-  public DoFn() {
-    this(new HashMap<String, DelegatingAggregator<?, ?>>());
-  }
-
-  DoFn(Map<String, DelegatingAggregator<?, ?>> aggregators) {
-    this.aggregators = aggregators;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private final Map<String, DelegatingAggregator<?, ?>> aggregators;
-
-  /**
-   * Protects aggregators from being created after initialization.
-   */
-  private boolean aggregatorsAreFinal;
-
-  /**
-   * Prepares this {@code DoFn} instance for processing a batch of elements.
-   *
-   * <p>By default, does nothing.
-   */
-  public void startBundle(Context c) throws Exception {
-  }
-
-  /**
-   * Processes one input element.
-   *
-   * <p>The current element of the input {@code PCollection} is returned by
-   * {@link ProcessContext#element() c.element()}. It should be considered immutable. The Dataflow
-   * runtime will not mutate the element, so it is safe to cache, etc. The element should not be
-   * mutated by any of the {@link DoFn} methods, because it may be cached elsewhere, retained by the
-   * Dataflow runtime, or used in other unspecified ways.
-   *
-   * <p>A value is added to the main output {@code PCollection} by {@link ProcessContext#output}.
-   * Once passed to {@code output} the element should be considered immutable and not be modified in
-   * any way. It may be cached elsewhere, retained by the Dataflow runtime, or used in other
-   * unspecified ways.
-   *
-   * @see ProcessContext
-   */
-  public abstract void processElement(ProcessContext c) throws Exception;
-
-  /**
-   * Finishes processing this batch of elements.
-   *
-   * <p>By default, does nothing.
-   */
-  public void finishBundle(Context c) throws Exception {
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * <p>By default, does not register any display data. Implementors may override this method
-   * to provide their own display metadata.
-   */
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Returns a {@link TypeDescriptor} capturing what is known statically
-   * about the input type of this {@code DoFn} instance's most-derived
-   * class.
-   *
-   * <p>See {@link #getOutputTypeDescriptor} for more discussion.
-   */
-  protected TypeDescriptor<InputT> getInputTypeDescriptor() {
-    return new TypeDescriptor<InputT>(getClass()) {};
-  }
-
-  /**
-   * Returns a {@link TypeDescriptor} capturing what is known statically
-   * about the output type of this {@code DoFn} instance's
-   * most-derived class.
-   *
-   * <p>In the normal case of a concrete {@code DoFn} subclass with
-   * no generic type parameters of its own (including anonymous inner
-   * classes), this will be a complete non-generic type, which is good
-   * for choosing a default output {@code Coder<OutputT>} for the output
-   * {@code PCollection<OutputT>}.
-   */
-  protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
-    return new TypeDescriptor<OutputT>(getClass()) {};
-  }
-
-  /**
-   * Returns an {@link Aggregator} with aggregation logic specified by the
-   * {@link CombineFn} argument. The name provided must be unique across
-   * {@link Aggregator}s created within the DoFn. Aggregators can only be created
-   * during pipeline construction.
-   *
-   * @param name the name of the aggregator
-   * @param combiner the {@link CombineFn} to use in the aggregator
-   * @return an aggregator for the provided name and combiner in the scope of
-   *         this DoFn
-   * @throws NullPointerException if the name or combiner is null
-   * @throws IllegalArgumentException if the given name collides with another
-   *         aggregator in this scope
-   * @throws IllegalStateException if called during pipeline processing.
-   */
-  protected final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
-      createAggregator(String name, CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
-    checkNotNull(name, "name cannot be null");
-    checkNotNull(combiner, "combiner cannot be null");
-    checkArgument(!aggregators.containsKey(name),
-        "Cannot create aggregator with name %s."
-        + " An Aggregator with that name already exists within this scope.",
-        name);
-
-    checkState(!aggregatorsAreFinal, "Cannot create an aggregator during DoFn processing."
-        + " Aggregators should be registered during pipeline construction.");
-
-    DelegatingAggregator<AggInputT, AggOutputT> aggregator =
-        new DelegatingAggregator<>(name, combiner);
-    aggregators.put(name, aggregator);
-    return aggregator;
-  }
-
-  /**
-   * Returns an {@link Aggregator} with the aggregation logic specified by the
-   * {@link SerializableFunction} argument. The name provided must be unique
-   * across {@link Aggregator}s created within the DoFn. Aggregators can only be
-   * created during pipeline construction.
-   *
-   * @param name the name of the aggregator
-   * @param combiner the {@link SerializableFunction} to use in the aggregator
-   * @return an aggregator for the provided name and combiner in the scope of
-   *         this DoFn
-   * @throws NullPointerException if the name or combiner is null
-   * @throws IllegalArgumentException if the given name collides with another
-   *         aggregator in this scope
-   * @throws IllegalStateException if called during pipeline processing.
-   */
-  protected final <AggInputT> Aggregator<AggInputT, AggInputT> createAggregator(String name,
-      SerializableFunction<Iterable<AggInputT>, AggInputT> combiner) {
-    checkNotNull(combiner, "combiner cannot be null.");
-    return createAggregator(name, Combine.IterableCombineFn.of(combiner));
-  }
-
-  /**
-   * Returns the {@link Aggregator Aggregators} created by this {@code DoFn}.
-   */
-  Collection<Aggregator<?, ?>> getAggregators() {
-    return Collections.<Aggregator<?, ?>>unmodifiableCollection(aggregators.values());
-  }
-
-  /**
-   * An {@link Aggregator} that delegates calls to addValue to another
-   * aggregator.
-   *
-   * @param <AggInputT> the type of input element
-   * @param <AggOutputT> the type of output element
-   */
-  static class DelegatingAggregator<AggInputT, AggOutputT> implements
-      Aggregator<AggInputT, AggOutputT>, Serializable {
-    private final UUID id;
-
-    private final String name;
-
-    private final CombineFn<AggInputT, ?, AggOutputT> combineFn;
-
-    private Aggregator<AggInputT, ?> delegate;
-
-    public DelegatingAggregator(String name,
-        CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
-      this.id = UUID.randomUUID();
-      this.name = checkNotNull(name, "name cannot be null");
-      // Safe contravariant cast
-      @SuppressWarnings("unchecked")
-      CombineFn<AggInputT, ?, AggOutputT> specificCombiner =
-          (CombineFn<AggInputT, ?, AggOutputT>) checkNotNull(combiner, "combineFn cannot be null");
-      this.combineFn = specificCombiner;
-    }
-
-    @Override
-    public void addValue(AggInputT value) {
-      if (delegate == null) {
-        throw new IllegalStateException(
-            "addValue cannot be called on Aggregator outside of the execution of a DoFn.");
-      } else {
-        delegate.addValue(value);
-      }
-    }
-
-    @Override
-    public String getName() {
-      return name;
-    }
-
-    @Override
-    public CombineFn<AggInputT, ?, AggOutputT> getCombineFn() {
-      return combineFn;
-    }
-
-    /**
-     * Sets the current delegate of the Aggregator.
-     *
-     * @param delegate the delegate to set in this aggregator
-     */
-    public void setDelegate(Aggregator<AggInputT, ?> delegate) {
-      this.delegate = delegate;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("name", name)
-          .add("combineFn", combineFn)
-          .toString();
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(id, name, combineFn.getClass());
-    }
-
-    /**
-     * Indicates whether some other object is "equal to" this one.
-     *
-     * <p>{@code DelegatingAggregator} instances are equal if they have the same name, their
-     * CombineFns are the same class, and they have identical IDs.
-     */
-    @Override
-    public boolean equals(Object o) {
-      if (o == this) {
-        return true;
-      }
-      if (o == null) {
-        return false;
-      }
-      if (o instanceof DelegatingAggregator) {
-        DelegatingAggregator<?, ?> that = (DelegatingAggregator<?, ?>) o;
-        return Objects.equals(this.id, that.id)
-            && Objects.equals(this.name, that.name)
-            && Objects.equals(this.combineFn.getClass(), that.combineFn.getClass());
-      }
-      return false;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnReflector.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnReflector.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnReflector.java
deleted file mode 100644
index 1c46541..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnReflector.java
+++ /dev/null
@@ -1,668 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.DoFnWithContext.ExtraContextFactory;
-import com.google.cloud.dataflow.sdk.transforms.DoFnWithContext.FinishBundle;
-import com.google.cloud.dataflow.sdk.transforms.DoFnWithContext.ProcessElement;
-import com.google.cloud.dataflow.sdk.transforms.DoFnWithContext.StartBundle;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.util.UserCodeException;
-import com.google.cloud.dataflow.sdk.util.WindowingInternals;
-import com.google.cloud.dataflow.sdk.util.common.ReflectHelpers;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Throwables;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.reflect.TypeParameter;
-import com.google.common.reflect.TypeToken;
-
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * Utility implementing the necessary reflection for working with {@link DoFnWithContext}s.
- */
-public abstract class DoFnReflector {
-
-  private interface ExtraContextInfo {
-    /**
-     * Create an instance of the given instance using the instance factory.
-     */
-    <InputT, OutputT> Object createInstance(
-        DoFnWithContext.ExtraContextFactory<InputT, OutputT> factory);
-
-    /**
-     * Create the type token for the given type, filling in the generics.
-     */
-    <InputT, OutputT> TypeToken<?> tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out);
-  }
-
-  private static final Map<Class<?>, ExtraContextInfo> EXTRA_CONTEXTS = Collections.emptyMap();
-  private static final Map<Class<?>, ExtraContextInfo> EXTRA_PROCESS_CONTEXTS =
-      ImmutableMap.<Class<?>, ExtraContextInfo>builder()
-      .putAll(EXTRA_CONTEXTS)
-      .put(BoundedWindow.class, new ExtraContextInfo() {
-        @Override
-        public <InputT, OutputT> Object
-            createInstance(ExtraContextFactory<InputT, OutputT> factory) {
-          return factory.window();
-        }
-
-        @Override
-        public <InputT, OutputT> TypeToken<?>
-            tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) {
-          return TypeToken.of(BoundedWindow.class);
-        }
-      })
-      .put(WindowingInternals.class, new ExtraContextInfo() {
-        @Override
-        public <InputT, OutputT> Object
-            createInstance(ExtraContextFactory<InputT, OutputT> factory) {
-          return factory.windowingInternals();
-        }
-
-        @Override
-        public <InputT, OutputT> TypeToken<?>
-            tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) {
-          return new TypeToken<WindowingInternals<InputT, OutputT>>() {
-            }
-          .where(new TypeParameter<InputT>() {}, in)
-          .where(new TypeParameter<OutputT>() {}, out);
-        }
-      })
-      .build();
-
-  /**
-   * @return true if the reflected {@link DoFnWithContext} uses a Single Window.
-   */
-  public abstract boolean usesSingleWindow();
-
-  /**
-   * Invoke the reflected {@link ProcessElement} method on the given instance.
-   *
-   * @param fn an instance of the {@link DoFnWithContext} to invoke {@link ProcessElement} on.
-   * @param c the {@link com.google.cloud.dataflow.sdk.transforms.DoFnWithContext.ProcessContext}
-   *     to pass to {@link ProcessElement}.
-   */
-  abstract <InputT, OutputT> void invokeProcessElement(
-      DoFnWithContext<InputT, OutputT> fn,
-      DoFnWithContext<InputT, OutputT>.ProcessContext c,
-      ExtraContextFactory<InputT, OutputT> extra);
-
-  /**
-   * Invoke the reflected {@link StartBundle} method on the given instance.
-   *
-   * @param fn an instance of the {@link DoFnWithContext} to invoke {@link StartBundle} on.
-   * @param c the {@link com.google.cloud.dataflow.sdk.transforms.DoFnWithContext.Context}
-   *     to pass to {@link StartBundle}.
-   */
-  <InputT, OutputT> void invokeStartBundle(
-     DoFnWithContext<InputT, OutputT> fn,
-     DoFnWithContext<InputT, OutputT>.Context c,
-     ExtraContextFactory<InputT, OutputT> extra) {
-    fn.prepareForProcessing();
-  }
-
-  /**
-   * Invoke the reflected {@link FinishBundle} method on the given instance.
-   *
-   * @param fn an instance of the {@link DoFnWithContext} to invoke {@link FinishBundle} on.
-   * @param c the {@link com.google.cloud.dataflow.sdk.transforms.DoFnWithContext.Context}
-   *     to pass to {@link FinishBundle}.
-   */
-  abstract <InputT, OutputT> void invokeFinishBundle(
-      DoFnWithContext<InputT, OutputT> fn,
-      DoFnWithContext<InputT, OutputT>.Context c,
-      ExtraContextFactory<InputT, OutputT> extra);
-
-  private static final Map<Class<?>, DoFnReflector> REFLECTOR_CACHE =
-      new LinkedHashMap<Class<?>, DoFnReflector>();
-
-  /**
-   * @return the {@link DoFnReflector} for the given {@link DoFnWithContext}.
-   */
-  public static DoFnReflector of(
-      @SuppressWarnings("rawtypes") Class<? extends DoFnWithContext> fn) {
-    DoFnReflector reflector = REFLECTOR_CACHE.get(fn);
-    if (reflector != null) {
-      return reflector;
-    }
-
-    reflector = new GenericDoFnReflector(fn);
-    REFLECTOR_CACHE.put(fn, reflector);
-    return reflector;
-  }
-
-  /**
-   * Create a {@link DoFn} that the {@link DoFnWithContext}.
-   */
-  public <InputT, OutputT> DoFn<InputT, OutputT> toDoFn(DoFnWithContext<InputT, OutputT> fn) {
-    if (usesSingleWindow()) {
-      return new WindowDoFnAdapter<InputT, OutputT>(this, fn);
-    } else {
-      return new SimpleDoFnAdapter<InputT, OutputT>(this, fn);
-    }
-  }
-
-  private static String formatType(TypeToken<?> t) {
-    return ReflectHelpers.TYPE_SIMPLE_DESCRIPTION.apply(t.getType());
-  }
-
-  private static String format(Method m) {
-    return ReflectHelpers.CLASS_AND_METHOD_FORMATTER.apply(m);
-  }
-
-  private static Collection<String> describeSupportedTypes(
-      Map<Class<?>, ExtraContextInfo> extraProcessContexts,
-      final TypeToken<?> in, final TypeToken<?> out) {
-    return FluentIterable
-        .from(extraProcessContexts.values())
-        .transform(new Function<ExtraContextInfo, String>() {
-          @Override
-          @Nullable
-          public String apply(@Nullable ExtraContextInfo input) {
-            if (input == null) {
-              return null;
-            } else {
-              return formatType(input.tokenFor(in, out));
-            }
-          }
-        })
-        .toSortedSet(String.CASE_INSENSITIVE_ORDER);
-  }
-
-  @VisibleForTesting
-  static <InputT, OutputT> ExtraContextInfo[] verifyProcessMethodArguments(Method m) {
-    return verifyMethodArguments(m,
-        EXTRA_PROCESS_CONTEXTS,
-        new TypeToken<DoFnWithContext<InputT, OutputT>.ProcessContext>() {
-          },
-        new TypeParameter<InputT>() {},
-        new TypeParameter<OutputT>() {});
-  }
-
-  @VisibleForTesting
-  static <InputT, OutputT> ExtraContextInfo[] verifyBundleMethodArguments(Method m) {
-    return verifyMethodArguments(m,
-        EXTRA_CONTEXTS,
-        new TypeToken<DoFnWithContext<InputT, OutputT>.Context>() {
-          },
-        new TypeParameter<InputT>() {},
-        new TypeParameter<OutputT>() {});
-  }
-
-  /**
-   * Verify the method arguments for a given {@link DoFnWithContext} method.
-   *
-   * <p>The requirements for a method to be valid, are:
-   * <ol>
-   * <li>The method has at least one argument.
-   * <li>The first argument is of type firstContextArg.
-   * <li>The remaining arguments have raw types that appear in {@code contexts}
-   * <li>Any generics on the extra context arguments match what is expected. Eg.,
-   *     {@code WindowingInternals<InputT, OutputT>} either matches the
-   *     {@code InputT} and {@code OutputT} parameters of the
-   *     {@code DoFn<InputT, OutputT>.ProcessContext}, or it uses a wildcard, etc.
-   * </ol>
-   *
-   * @param m the method to verify
-   * @param contexts mapping from raw classes to the {@link ExtraContextInfo} used
-   *     to create new instances.
-   * @param firstContextArg the expected type of the first context argument
-   * @param iParam TypeParameter representing the input type
-   * @param oParam TypeParameter representing the output type
-   */
-  @VisibleForTesting static <InputT, OutputT> ExtraContextInfo[] verifyMethodArguments(Method m,
-      Map<Class<?>, ExtraContextInfo> contexts,
-      TypeToken<?> firstContextArg, TypeParameter<InputT> iParam, TypeParameter<OutputT> oParam) {
-
-    if (!void.class.equals(m.getReturnType())) {
-      throw new IllegalStateException(String.format(
-          "%s must have a void return type", format(m)));
-    }
-    if (m.isVarArgs()) {
-      throw new IllegalStateException(String.format(
-          "%s must not have var args", format(m)));
-    }
-
-    // The first parameter must be present, and must be the specified type
-    Type[] params = m.getGenericParameterTypes();
-    TypeToken<?> contextToken = null;
-    if (params.length > 0) {
-      contextToken = TypeToken.of(params[0]);
-    }
-    if (contextToken == null
-        || !contextToken.getRawType().equals(firstContextArg.getRawType())) {
-      throw new IllegalStateException(String.format(
-          "%s must take a %s as its first argument",
-          format(m), firstContextArg.getRawType().getSimpleName()));
-    }
-    ExtraContextInfo[] contextInfos = new ExtraContextInfo[params.length - 1];
-
-    // Fill in the generics in the allExtraContextArgs interface from the types in the
-    // Context or ProcessContext DoFn.
-    ParameterizedType pt = (ParameterizedType) contextToken.getType();
-    // We actually want the owner, since ProcessContext and Context are owned by DoFnWithContext.
-    pt = (ParameterizedType) pt.getOwnerType();
-    @SuppressWarnings("unchecked")
-    TypeToken<InputT> iActual = (TypeToken<InputT>) TypeToken.of(pt.getActualTypeArguments()[0]);
-    @SuppressWarnings("unchecked")
-    TypeToken<OutputT> oActual = (TypeToken<OutputT>) TypeToken.of(pt.getActualTypeArguments()[1]);
-
-    // All of the remaining parameters must be a super-interface of allExtraContextArgs
-    // that is not listed in the EXCLUDED_INTERFACES set.
-    for (int i = 1; i < params.length; i++) {
-      TypeToken<?> param = TypeToken.of(params[i]);
-
-      ExtraContextInfo info = contexts.get(param.getRawType());
-      if (info == null) {
-        throw new IllegalStateException(String.format(
-            "%s is not a valid context parameter for method %s. Should be one of %s",
-            formatType(param), format(m),
-            describeSupportedTypes(contexts, iActual, oActual)));
-      }
-
-      // If we get here, the class matches, but maybe the generics don't:
-      TypeToken<?> expected = info.tokenFor(iActual, oActual);
-      if (!expected.isSubtypeOf(param)) {
-        throw new IllegalStateException(String.format(
-            "Incompatible generics in context parameter %s for method %s. Should be %s",
-            formatType(param), format(m), formatType(info.tokenFor(iActual, oActual))));
-      }
-
-      // Register the (now validated) context info
-      contextInfos[i - 1] = info;
-    }
-    return contextInfos;
-  }
-
-  /**
-   * Implementation of {@link DoFnReflector} for the arbitrary {@link DoFnWithContext}.
-   */
-  private static class GenericDoFnReflector extends DoFnReflector {
-
-    private Method startBundle;
-    private Method processElement;
-    private Method finishBundle;
-    private ExtraContextInfo[] processElementArgs;
-    private ExtraContextInfo[] startBundleArgs;
-    private ExtraContextInfo[] finishBundleArgs;
-
-    private GenericDoFnReflector(Class<?> fn) {
-      // Locate the annotated methods
-      this.processElement = findAnnotatedMethod(ProcessElement.class, fn, true);
-      this.startBundle = findAnnotatedMethod(StartBundle.class, fn, false);
-      this.finishBundle = findAnnotatedMethod(FinishBundle.class, fn, false);
-
-      // Verify that their method arguments satisfy our conditions.
-      processElementArgs = verifyProcessMethodArguments(processElement);
-      if (startBundle != null) {
-        startBundleArgs = verifyBundleMethodArguments(startBundle);
-      }
-      if (finishBundle != null) {
-        finishBundleArgs = verifyBundleMethodArguments(finishBundle);
-      }
-    }
-
-    private static Collection<Method> declaredMethodsWithAnnotation(
-        Class<? extends Annotation> anno,
-        Class<?> startClass, Class<?> stopClass) {
-      Collection<Method> matches = new ArrayList<>();
-
-      Class<?> clazz = startClass;
-      LinkedHashSet<Class<?>> interfaces = new LinkedHashSet<>();
-
-      // First, find all declared methods on the startClass and parents (up to stopClass)
-      while (clazz != null && !clazz.equals(stopClass)) {
-        for (Method method : clazz.getDeclaredMethods()) {
-          if (method.isAnnotationPresent(anno)) {
-            matches.add(method);
-          }
-        }
-
-        Collections.addAll(interfaces, clazz.getInterfaces());
-
-        clazz = clazz.getSuperclass();
-      }
-
-      // Now, iterate over all the discovered interfaces
-      for (Method method : ReflectHelpers.getClosureOfMethodsOnInterfaces(interfaces)) {
-        if (method.isAnnotationPresent(anno)) {
-          matches.add(method);
-        }
-      }
-      return matches;
-    }
-
-    private static Method findAnnotatedMethod(
-        Class<? extends Annotation> anno, Class<?> fnClazz, boolean required) {
-      Collection<Method> matches = declaredMethodsWithAnnotation(
-          anno, fnClazz, DoFnWithContext.class);
-
-      if (matches.size() == 0) {
-        if (required == true) {
-          throw new IllegalStateException(String.format(
-              "No method annotated with @%s found in %s",
-              anno.getSimpleName(), fnClazz.getName()));
-        } else {
-          return null;
-        }
-      }
-
-      // If we have at least one match, then either it should be the only match
-      // or it should be an extension of the other matches (which came from parent
-      // classes).
-      Method first = matches.iterator().next();
-      for (Method other : matches) {
-        if (!first.getName().equals(other.getName())
-            || !Arrays.equals(first.getParameterTypes(), other.getParameterTypes())) {
-          throw new IllegalStateException(String.format(
-              "Found multiple methods annotated with @%s. [%s] and [%s]",
-              anno.getSimpleName(), format(first), format(other)));
-        }
-      }
-
-      // We need to be able to call it. We require it is public.
-      if ((first.getModifiers() & Modifier.PUBLIC) == 0) {
-        throw new IllegalStateException(format(first) + " must be public");
-      }
-
-      // And make sure its not static.
-      if ((first.getModifiers() & Modifier.STATIC) != 0) {
-        throw new IllegalStateException(format(first) + " must not be static");
-      }
-
-      first.setAccessible(true);
-      return first;
-    }
-
-    @Override
-    public boolean usesSingleWindow() {
-      return usesContext(BoundedWindow.class);
-    }
-
-    private boolean usesContext(Class<?> context) {
-      for (Class<?> clazz : processElement.getParameterTypes()) {
-        if (clazz.equals(context)) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    @Override
-    <InputT, OutputT> void invokeProcessElement(
-        DoFnWithContext<InputT, OutputT> fn,
-        DoFnWithContext<InputT, OutputT>.ProcessContext c,
-        ExtraContextFactory<InputT, OutputT> extra) {
-      invoke(processElement, fn, c, extra, processElementArgs);
-    }
-
-    @Override
-    <InputT, OutputT> void invokeStartBundle(
-        DoFnWithContext<InputT, OutputT> fn,
-        DoFnWithContext<InputT, OutputT>.Context c,
-        ExtraContextFactory<InputT, OutputT> extra) {
-      super.invokeStartBundle(fn, c, extra);
-      if (startBundle != null) {
-        invoke(startBundle, fn, c, extra, startBundleArgs);
-      }
-    }
-
-    @Override
-    <InputT, OutputT> void invokeFinishBundle(
-        DoFnWithContext<InputT, OutputT> fn,
-        DoFnWithContext<InputT, OutputT>.Context c,
-        ExtraContextFactory<InputT, OutputT> extra) {
-      if (finishBundle != null) {
-        invoke(finishBundle, fn, c, extra, finishBundleArgs);
-      }
-    }
-
-    private <InputT, OutputT> void invoke(Method m,
-        DoFnWithContext<InputT, OutputT> on,
-        DoFnWithContext<InputT, OutputT>.Context contextArg,
-        ExtraContextFactory<InputT, OutputT> extraArgFactory,
-        ExtraContextInfo[] extraArgs) {
-
-      Class<?>[] parameterTypes = m.getParameterTypes();
-      Object[] args = new Object[parameterTypes.length];
-      args[0] = contextArg;
-      for (int i = 1; i < args.length; i++) {
-        args[i] = extraArgs[i - 1].createInstance(extraArgFactory);
-      }
-
-      try {
-        m.invoke(on, args);
-      } catch (InvocationTargetException e) {
-        // Exception in user code.
-        throw UserCodeException.wrap(e.getCause());
-      } catch (IllegalAccessException | IllegalArgumentException e) {
-        // Exception in our code.
-        throw Throwables.propagate(e);
-      }
-    }
-  }
-
-  private static class ContextAdapter<InputT, OutputT>
-      extends DoFnWithContext<InputT, OutputT>.Context
-      implements DoFnWithContext.ExtraContextFactory<InputT, OutputT> {
-
-    private DoFn<InputT, OutputT>.Context context;
-
-    private ContextAdapter(
-        DoFnWithContext<InputT, OutputT> fn, DoFn<InputT, OutputT>.Context context) {
-      fn.super();
-      this.context = context;
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return context.getPipelineOptions();
-    }
-
-    @Override
-    public void output(OutputT output) {
-      context.output(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      context.outputWithTimestamp(output, timestamp);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      context.sideOutput(tag, output);
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      context.sideOutputWithTimestamp(tag, output, timestamp);
-    }
-
-    @Override
-    public BoundedWindow window() {
-      // The DoFnWithContext doesn't allow us to ask for these outside ProcessElements, so this
-      // should be unreachable.
-      throw new UnsupportedOperationException("Can only get the window in ProcessElements");
-    }
-
-    @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      // The DoFnWithContext doesn't allow us to ask for these outside ProcessElements, so this
-      // should be unreachable.
-      throw new UnsupportedOperationException(
-          "Can only get the windowingInternals in ProcessElements");
-    }
-  }
-
-  private static class ProcessContextAdapter<InputT, OutputT>
-      extends DoFnWithContext<InputT, OutputT>.ProcessContext
-      implements DoFnWithContext.ExtraContextFactory<InputT, OutputT> {
-
-    private DoFn<InputT, OutputT>.ProcessContext context;
-
-    private ProcessContextAdapter(
-        DoFnWithContext<InputT, OutputT> fn,
-        DoFn<InputT, OutputT>.ProcessContext context) {
-      fn.super();
-      this.context = context;
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return context.getPipelineOptions();
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      return context.sideInput(view);
-    }
-
-    @Override
-    public void output(OutputT output) {
-      context.output(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      context.outputWithTimestamp(output, timestamp);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      context.sideOutput(tag, output);
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      context.sideOutputWithTimestamp(tag, output, timestamp);
-    }
-
-    @Override
-    public InputT element() {
-      return context.element();
-    }
-
-    @Override
-    public Instant timestamp() {
-      return context.timestamp();
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return context.pane();
-    }
-
-    @Override
-    public BoundedWindow window() {
-      return context.window();
-    }
-
-    @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      return context.windowingInternals();
-    }
-  }
-
-  public static Class<?> getDoFnClass(DoFn<?, ?> fn) {
-    if (fn instanceof SimpleDoFnAdapter) {
-      return ((SimpleDoFnAdapter<?, ?>) fn).fn.getClass();
-    } else {
-      return fn.getClass();
-    }
-  }
-
-  private static class SimpleDoFnAdapter<InputT, OutputT> extends DoFn<InputT, OutputT> {
-
-    private transient DoFnReflector reflector;
-    private DoFnWithContext<InputT, OutputT> fn;
-
-    private SimpleDoFnAdapter(DoFnReflector reflector, DoFnWithContext<InputT, OutputT> fn) {
-      super(fn.aggregators);
-      this.reflector = reflector;
-      this.fn = fn;
-    }
-
-    @Override
-    public void startBundle(DoFn<InputT, OutputT>.Context c) throws Exception {
-      ContextAdapter<InputT, OutputT> adapter = new ContextAdapter<>(fn, c);
-      reflector.invokeStartBundle(fn, (DoFnWithContext<InputT, OutputT>.Context) adapter, adapter);
-    }
-
-    @Override
-    public void finishBundle(DoFn<InputT, OutputT>.Context c) throws Exception {
-      ContextAdapter<InputT, OutputT> adapter = new ContextAdapter<>(fn, c);
-      reflector.invokeFinishBundle(fn, (DoFnWithContext<InputT, OutputT>.Context) adapter, adapter);
-    }
-
-    @Override
-    public void processElement(DoFn<InputT, OutputT>.ProcessContext c) throws Exception {
-      ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c);
-      reflector.invokeProcessElement(
-          fn, (DoFnWithContext<InputT, OutputT>.ProcessContext) adapter, adapter);
-    }
-
-    @Override
-    protected TypeDescriptor<InputT> getInputTypeDescriptor() {
-      return fn.getInputTypeDescriptor();
-    }
-
-    @Override
-    protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
-      return fn.getOutputTypeDescriptor();
-    }
-
-    private void readObject(java.io.ObjectInputStream in)
-        throws IOException, ClassNotFoundException {
-      in.defaultReadObject();
-      reflector = DoFnReflector.of(fn.getClass());
-    }
-  }
-
-  private static class WindowDoFnAdapter<InputT, OutputT>
-  extends SimpleDoFnAdapter<InputT, OutputT> implements DoFn.RequiresWindowAccess {
-
-    private WindowDoFnAdapter(DoFnReflector reflector, DoFnWithContext<InputT, OutputT> fn) {
-      super(reflector, fn);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnTester.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnTester.java
deleted file mode 100644
index 5447664..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnTester.java
+++ /dev/null
@@ -1,495 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.util.DirectModeExecutionContext;
-import com.google.cloud.dataflow.sdk.util.DirectSideInputReader;
-import com.google.cloud.dataflow.sdk.util.DoFnRunner;
-import com.google.cloud.dataflow.sdk.util.DoFnRunnerBase;
-import com.google.cloud.dataflow.sdk.util.DoFnRunners;
-import com.google.cloud.dataflow.sdk.util.PTuple;
-import com.google.cloud.dataflow.sdk.util.SerializableUtils;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.util.common.Counter;
-import com.google.cloud.dataflow.sdk.util.common.CounterSet;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.cloud.dataflow.sdk.values.TupleTagList;
-import com.google.common.base.Function;
-import com.google.common.base.Objects;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-import org.joda.time.Instant;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A harness for unit-testing a {@link DoFn}.
- *
- * <p>For example:
- *
- * <pre> {@code
- * DoFn<InputT, OutputT> fn = ...;
- *
- * DoFnTester<InputT, OutputT> fnTester = DoFnTester.of(fn);
- *
- * // Set arguments shared across all batches:
- * fnTester.setSideInputs(...);      // If fn takes side inputs.
- * fnTester.setSideOutputTags(...);  // If fn writes to side outputs.
- *
- * // Process a batch containing a single input element:
- * Input testInput = ...;
- * List<OutputT> testOutputs = fnTester.processBatch(testInput);
- * Assert.assertThat(testOutputs,
- *                   JUnitMatchers.hasItems(...));
- *
- * // Process a bigger batch:
- * Assert.assertThat(fnTester.processBatch(i1, i2, ...),
- *                   JUnitMatchers.hasItems(...));
- * } </pre>
- *
- * @param <InputT> the type of the {@code DoFn}'s (main) input elements
- * @param <OutputT> the type of the {@code DoFn}'s (main) output elements
- */
-public class DoFnTester<InputT, OutputT> {
-  /**
-   * Returns a {@code DoFnTester} supporting unit-testing of the given
-   * {@link DoFn}.
-   */
-  @SuppressWarnings("unchecked")
-  public static <InputT, OutputT> DoFnTester<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
-    return new DoFnTester<InputT, OutputT>(fn);
-  }
-
-  /**
-   * Returns a {@code DoFnTester} supporting unit-testing of the given
-   * {@link DoFn}.
-   */
-  @SuppressWarnings("unchecked")
-  public static <InputT, OutputT> DoFnTester<InputT, OutputT>
-      of(DoFnWithContext<InputT, OutputT> fn) {
-    return new DoFnTester<InputT, OutputT>(DoFnReflector.of(fn.getClass()).toDoFn(fn));
-  }
-
-  /**
-   * Registers the tuple of values of the side input {@link PCollectionView}s to
-   * pass to the {@link DoFn} under test.
-   *
-   * <p>If needed, first creates a fresh instance of the {@link DoFn}
-   * under test.
-   *
-   * <p>If this isn't called, {@code DoFnTester} assumes the
-   * {@link DoFn} takes no side inputs.
-   */
-  public void setSideInputs(Map<PCollectionView<?>, Iterable<WindowedValue<?>>> sideInputs) {
-    this.sideInputs = sideInputs;
-    resetState();
-  }
-
-  /**
-   * Registers the values of a side input {@link PCollectionView} to
-   * pass to the {@link DoFn} under test.
-   *
-   * <p>If needed, first creates a fresh instance of the {@code DoFn}
-   * under test.
-   *
-   * <p>If this isn't called, {@code DoFnTester} assumes the
-   * {@code DoFn} takes no side inputs.
-   */
-  public void setSideInput(PCollectionView<?> sideInput, Iterable<WindowedValue<?>> value) {
-    sideInputs.put(sideInput, value);
-  }
-
-  /**
-   * Registers the values for a side input {@link PCollectionView} to
-   * pass to the {@link DoFn} under test. All values are placed
-   * in the global window.
-   */
-  public void setSideInputInGlobalWindow(
-      PCollectionView<?> sideInput,
-      Iterable<?> value) {
-    sideInputs.put(
-        sideInput,
-        Iterables.transform(value, new Function<Object, WindowedValue<?>>() {
-          @Override
-          public WindowedValue<?> apply(Object input) {
-            return WindowedValue.valueInGlobalWindow(input);
-          }
-        }));
-  }
-
-
-  /**
-   * Registers the list of {@code TupleTag}s that can be used by the
-   * {@code DoFn} under test to output to side output
-   * {@code PCollection}s.
-   *
-   * <p>If needed, first creates a fresh instance of the DoFn under test.
-   *
-   * <p>If this isn't called, {@code DoFnTester} assumes the
-   * {@code DoFn} doesn't emit to any side outputs.
-   */
-  public void setSideOutputTags(TupleTagList sideOutputTags) {
-    this.sideOutputTags = sideOutputTags.getAll();
-    resetState();
-  }
-
-  /**
-   * A convenience operation that first calls {@link #startBundle},
-   * then calls {@link #processElement} on each of the input elements, then
-   * calls {@link #finishBundle}, then returns the result of
-   * {@link #takeOutputElements}.
-   */
-  public List<OutputT> processBatch(Iterable <? extends InputT> inputElements) {
-    startBundle();
-    for (InputT inputElement : inputElements) {
-      processElement(inputElement);
-    }
-    finishBundle();
-    return takeOutputElements();
-  }
-
-  /**
-   * A convenience method for testing {@link DoFn DoFns} with bundles of elements.
-   * Logic proceeds as follows:
-   *
-   * <ol>
-   *   <li>Calls {@link #startBundle}.</li>
-   *   <li>Calls {@link #processElement} on each of the arguments.<li>
-   *   <li>Calls {@link #finishBundle}.</li>
-   *   <li>Returns the result of {@link #takeOutputElements}.</li>
-   * </ol>
-   */
-  @SafeVarargs
-  public final List<OutputT> processBatch(InputT... inputElements) {
-    return processBatch(Arrays.asList(inputElements));
-  }
-
-  /**
-   * Calls {@link DoFn#startBundle} on the {@code DoFn} under test.
-   *
-   * <p>If needed, first creates a fresh instance of the DoFn under test.
-   */
-  public void startBundle() {
-    resetState();
-    initializeState();
-    fnRunner.startBundle();
-    state = State.STARTED;
-  }
-
-  /**
-   * Calls {@link DoFn#processElement} on the {@code DoFn} under test, in a
-   * context where {@link DoFn.ProcessContext#element} returns the
-   * given element.
-   *
-   * <p>Will call {@link #startBundle} automatically, if it hasn't
-   * already been called.
-   *
-   * @throws IllegalStateException if the {@code DoFn} under test has already
-   * been finished
-   */
-  public void processElement(InputT element) {
-    if (state == State.FINISHED) {
-      throw new IllegalStateException("finishBundle() has already been called");
-    }
-    if (state == State.UNSTARTED) {
-      startBundle();
-    }
-    fnRunner.processElement(WindowedValue.valueInGlobalWindow(element));
-  }
-
-  /**
-   * Calls {@link DoFn#finishBundle} of the {@code DoFn} under test.
-   *
-   * <p>Will call {@link #startBundle} automatically, if it hasn't
-   * already been called.
-   *
-   * @throws IllegalStateException if the {@code DoFn} under test has already
-   * been finished
-   */
-  public void finishBundle() {
-    if (state == State.FINISHED) {
-      throw new IllegalStateException("finishBundle() has already been called");
-    }
-    if (state == State.UNSTARTED) {
-      startBundle();
-    }
-    fnRunner.finishBundle();
-    state = State.FINISHED;
-  }
-
-  /**
-   * Returns the elements output so far to the main output.  Does not
-   * clear them, so subsequent calls will continue to include these
-   * elements.
-   *
-   * @see #takeOutputElements
-   * @see #clearOutputElements
-   *
-   */
-  public List<OutputT> peekOutputElements() {
-    // TODO: Should we return an unmodifiable list?
-    return Lists.transform(
-        peekOutputElementsWithTimestamp(),
-        new Function<OutputElementWithTimestamp<OutputT>, OutputT>() {
-          @Override
-          @SuppressWarnings("unchecked")
-          public OutputT apply(OutputElementWithTimestamp<OutputT> input) {
-            return input.getValue();
-          }
-        });
-  }
-
-  /**
-   * Returns the elements output so far to the main output with associated timestamps.  Does not
-   * clear them, so subsequent calls will continue to include these.
-   * elements.
-   *
-   * @see #takeOutputElementsWithTimestamp
-   * @see #clearOutputElements
-   */
-  @Experimental
-  public List<OutputElementWithTimestamp<OutputT>> peekOutputElementsWithTimestamp() {
-    // TODO: Should we return an unmodifiable list?
-    return Lists.transform(
-        outputManager.getOutput(mainOutputTag),
-        new Function<Object, OutputElementWithTimestamp<OutputT>>() {
-          @Override
-          @SuppressWarnings("unchecked")
-          public OutputElementWithTimestamp<OutputT> apply(Object input) {
-            return new OutputElementWithTimestamp<OutputT>(
-                ((WindowedValue<OutputT>) input).getValue(),
-                ((WindowedValue<OutputT>) input).getTimestamp());
-          }
-        });
-  }
-
-  /**
-   * Clears the record of the elements output so far to the main output.
-   *
-   * @see #peekOutputElements
-   */
-  public void clearOutputElements() {
-    peekOutputElements().clear();
-  }
-
-  /**
-   * Returns the elements output so far to the main output.
-   * Clears the list so these elements don't appear in future calls.
-   *
-   * @see #peekOutputElements
-   */
-  public List<OutputT> takeOutputElements() {
-    List<OutputT> resultElems = new ArrayList<>(peekOutputElements());
-    clearOutputElements();
-    return resultElems;
-  }
-
-  /**
-   * Returns the elements output so far to the main output with associated timestamps.
-   * Clears the list so these elements don't appear in future calls.
-   *
-   * @see #peekOutputElementsWithTimestamp
-   * @see #takeOutputElements
-   * @see #clearOutputElements
-   */
-  @Experimental
-  public List<OutputElementWithTimestamp<OutputT>> takeOutputElementsWithTimestamp() {
-    List<OutputElementWithTimestamp<OutputT>> resultElems =
-        new ArrayList<>(peekOutputElementsWithTimestamp());
-    clearOutputElements();
-    return resultElems;
-  }
-
-  /**
-   * Returns the elements output so far to the side output with the
-   * given tag.  Does not clear them, so subsequent calls will
-   * continue to include these elements.
-   *
-   * @see #takeSideOutputElements
-   * @see #clearSideOutputElements
-   */
-  public <T> List<T> peekSideOutputElements(TupleTag<T> tag) {
-    // TODO: Should we return an unmodifiable list?
-    return Lists.transform(
-        outputManager.getOutput(tag),
-        new Function<WindowedValue<T>, T>() {
-          @SuppressWarnings("unchecked")
-          @Override
-          public T apply(WindowedValue<T> input) {
-            return input.getValue();
-          }});
-  }
-
-  /**
-   * Clears the record of the elements output so far to the side
-   * output with the given tag.
-   *
-   * @see #peekSideOutputElements
-   */
-  public <T> void clearSideOutputElements(TupleTag<T> tag) {
-    peekSideOutputElements(tag).clear();
-  }
-
-  /**
-   * Returns the elements output so far to the side output with the given tag.
-   * Clears the list so these elements don't appear in future calls.
-   *
-   * @see #peekSideOutputElements
-   */
-  public <T> List<T> takeSideOutputElements(TupleTag<T> tag) {
-    List<T> resultElems = new ArrayList<>(peekSideOutputElements(tag));
-    clearSideOutputElements(tag);
-    return resultElems;
-  }
-
-  /**
-   * Returns the value of the provided {@link Aggregator}.
-   */
-  public <AggregateT> AggregateT getAggregatorValue(Aggregator<?, AggregateT> agg) {
-    @SuppressWarnings("unchecked")
-    Counter<AggregateT> counter =
-        (Counter<AggregateT>)
-            counterSet.getExistingCounter("user-" + STEP_NAME + "-" + agg.getName());
-    return counter.getAggregate();
-  }
-
-  /**
-   * Holder for an OutputElement along with its associated timestamp.
-   */
-  @Experimental
-  public static class OutputElementWithTimestamp<OutputT> {
-    private final OutputT value;
-    private final Instant timestamp;
-
-    OutputElementWithTimestamp(OutputT value, Instant timestamp) {
-      this.value = value;
-      this.timestamp = timestamp;
-    }
-
-    OutputT getValue() {
-      return value;
-    }
-
-    Instant getTimestamp() {
-      return timestamp;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (!(obj instanceof OutputElementWithTimestamp)) {
-        return false;
-      }
-      OutputElementWithTimestamp<?> other = (OutputElementWithTimestamp<?>) obj;
-      return Objects.equal(other.value, value) && Objects.equal(other.timestamp, timestamp);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(value, timestamp);
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /** The possible states of processing a DoFn. */
-  enum State {
-    UNSTARTED,
-    STARTED,
-    FINISHED
-  }
-
-  /** The name of the step of a DoFnTester. */
-  static final String STEP_NAME = "stepName";
-  /** The name of the enclosing DoFn PTransform for a DoFnTester. */
-  static final String TRANSFORM_NAME = "transformName";
-
-  final PipelineOptions options = PipelineOptionsFactory.create();
-
-  /** The original DoFn under test. */
-  final DoFn<InputT, OutputT> origFn;
-
-  /** The side input values to provide to the DoFn under test. */
-  private Map<PCollectionView<?>, Iterable<WindowedValue<?>>> sideInputs =
-      new HashMap<>();
-
-  /** The output tags used by the DoFn under test. */
-  TupleTag<OutputT> mainOutputTag = new TupleTag<>();
-  List<TupleTag<?>> sideOutputTags = new ArrayList<>();
-
-  /** The original DoFn under test, if started. */
-  DoFn<InputT, OutputT> fn;
-
-  /** The ListOutputManager to examine the outputs. */
-  DoFnRunnerBase.ListOutputManager outputManager;
-
-  /** The DoFnRunner if processing is in progress. */
-  DoFnRunner<InputT, OutputT> fnRunner;
-
-  /** Counters for user-defined Aggregators if processing is in progress. */
-  CounterSet counterSet;
-
-  /** The state of processing of the DoFn under test. */
-  State state;
-
-  DoFnTester(DoFn<InputT, OutputT> origFn) {
-    this.origFn = origFn;
-    resetState();
-  }
-
-  void resetState() {
-    fn = null;
-    outputManager = null;
-    fnRunner = null;
-    counterSet = null;
-    state = State.UNSTARTED;
-  }
-
-  @SuppressWarnings("unchecked")
-  void initializeState() {
-    fn = (DoFn<InputT, OutputT>)
-        SerializableUtils.deserializeFromByteArray(
-            SerializableUtils.serializeToByteArray(origFn),
-            origFn.toString());
-    counterSet = new CounterSet();
-    PTuple runnerSideInputs = PTuple.empty();
-    for (Map.Entry<PCollectionView<?>, Iterable<WindowedValue<?>>> entry
-        : sideInputs.entrySet()) {
-      runnerSideInputs = runnerSideInputs.and(entry.getKey().getTagInternal(), entry.getValue());
-    }
-    outputManager = new DoFnRunnerBase.ListOutputManager();
-    fnRunner = DoFnRunners.createDefault(
-        options,
-        fn,
-        DirectSideInputReader.of(runnerSideInputs),
-        outputManager,
-        mainOutputTag,
-        sideOutputTags,
-        DirectModeExecutionContext.create().getOrCreateStepContext(STEP_NAME, TRANSFORM_NAME, null),
-        counterSet.getAddCounterMutator(),
-        WindowingStrategy.globalDefault());
-  }
-}