You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/04 01:26:11 UTC

[03/19] incubator-beam git commit: Rename DoFnWithContext to DoFn

Rename DoFnWithContext to DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3bcb6f46
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3bcb6f46
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3bcb6f46

Branch: refs/heads/master
Commit: 3bcb6f46ad0ae483d1d8785edc2d9d5846c71a73
Parents: e160966
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jul 22 14:10:01 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Aug 3 18:25:52 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/DoFn.java    | 429 +++++++++++++++++++
 .../beam/sdk/transforms/DoFnReflector.java      |  84 ++--
 .../apache/beam/sdk/transforms/DoFnTester.java  |   2 +-
 .../beam/sdk/transforms/DoFnWithContext.java    | 429 -------------------
 .../org/apache/beam/sdk/transforms/OldDoFn.java |   2 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |  16 +-
 .../beam/sdk/transforms/DoFnReflectorTest.java  |  86 ++--
 .../apache/beam/sdk/transforms/DoFnTest.java    | 237 ++++++++++
 .../sdk/transforms/DoFnWithContextTest.java     | 237 ----------
 .../apache/beam/sdk/transforms/ParDoTest.java   |  12 +-
 .../dofnreflector/DoFnReflectorTestHelper.java  |  26 +-
 .../transforms/DoFnReflectorBenchmark.java      |  30 +-
 12 files changed, 795 insertions(+), 795 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
new file mode 100644
index 0000000..eb6753c
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.Serializable;
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The argument to {@link ParDo} providing the code to use to process
+ * elements of the input
+ * {@link org.apache.beam.sdk.values.PCollection}.
+ *
+ * <p>See {@link ParDo} for more explanation, examples of use, and
+ * discussion of constraints on {@code DoFn}s, including their
+ * serializability, lack of access to global shared mutable state,
+ * requirements for failure tolerance, and benefits of optimization.
+ *
+ * <p>{@code DoFn}s can be tested in a particular
+ * {@code Pipeline} by running that {@code Pipeline} on sample input
+ * and then checking its output.  Unit testing of a {@code DoFn},
+ * separately from any {@code ParDo} transform or {@code Pipeline},
+ * can be done via the {@link DoFnTester} harness.
+ *
+ * <p>Implementations must define a method annotated with {@link ProcessElement}
+ * that satisfies the requirements described there. See the {@link ProcessElement}
+ * for details.
+ *
+ * <p>This functionality is experimental and likely to change.
+ *
+ * <p>Example usage:
+ *
+ * <pre> {@code
+ * PCollection<String> lines = ... ;
+ * PCollection<String> words =
+ *     lines.apply(ParDo.of(new DoFn<String, String>() {
+ *         @ProcessElement
+ *         public void processElement(ProcessContext c, BoundedWindow window) {
+ *
+ *         }}));
+ * } </pre>
+ *
+ * @param <InputT> the type of the (main) input elements
+ * @param <OutputT> the type of the (main) output elements
+ */
+@Experimental
+public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayData {
+
+  /** Information accessible to all methods in this {@code DoFn}. */
+  public abstract class Context {
+
+    /**
+     * Returns the {@code PipelineOptions} specified with the
+     * {@link org.apache.beam.sdk.runners.PipelineRunner}
+     * invoking this {@code DoFn}.  The {@code PipelineOptions} will
+     * be the default running via {@link DoFnTester}.
+     */
+    public abstract PipelineOptions getPipelineOptions();
+
+    /**
+     * Adds the given element to the main output {@code PCollection}.
+     *
+     * <p>Once passed to {@code output} the element should not be modified in
+     * any way.
+     *
+     * <p>If invoked from {@link ProcessElement}, the output
+     * element will have the same timestamp and be in the same windows
+     * as the input element passed to the method annotated with
+     * {@code @ProcessElement}.
+     *
+     * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
+     * this will attempt to use the
+     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+     * of the input {@code PCollection} to determine what windows the element
+     * should be in, throwing an exception if the {@code WindowFn} attempts
+     * to access any information about the input element. The output element
+     * will have a timestamp of negative infinity.
+     */
+    public abstract void output(OutputT output);
+
+    /**
+     * Adds the given element to the main output {@code PCollection},
+     * with the given timestamp.
+     *
+     * <p>Once passed to {@code outputWithTimestamp} the element should not be
+     * modified in any way.
+     *
+     * <p>If invoked from {@link ProcessElement}), the timestamp
+     * must not be older than the input element's timestamp minus
+     * {@link OldDoFn#getAllowedTimestampSkew}.  The output element will
+     * be in the same windows as the input element.
+     *
+     * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
+     * this will attempt to use the
+     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+     * of the input {@code PCollection} to determine what windows the element
+     * should be in, throwing an exception if the {@code WindowFn} attempts
+     * to access any information about the input element except for the
+     * timestamp.
+     */
+    public abstract void outputWithTimestamp(OutputT output, Instant timestamp);
+
+    /**
+     * Adds the given element to the side output {@code PCollection} with the
+     * given tag.
+     *
+     * <p>Once passed to {@code sideOutput} the element should not be modified
+     * in any way.
+     *
+     * <p>The caller of {@code ParDo} uses {@link ParDo#withOutputTags} to
+     * specify the tags of side outputs that it consumes. Non-consumed side
+     * outputs, e.g., outputs for monitoring purposes only, don't necessarily
+     * need to be specified.
+     *
+     * <p>The output element will have the same timestamp and be in the same
+     * windows as the input element passed to {@link ProcessElement}).
+     *
+     * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
+     * this will attempt to use the
+     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+     * of the input {@code PCollection} to determine what windows the element
+     * should be in, throwing an exception if the {@code WindowFn} attempts
+     * to access any information about the input element. The output element
+     * will have a timestamp of negative infinity.
+     *
+     * @see ParDo#withOutputTags
+     */
+    public abstract <T> void sideOutput(TupleTag<T> tag, T output);
+
+    /**
+     * Adds the given element to the specified side output {@code PCollection},
+     * with the given timestamp.
+     *
+     * <p>Once passed to {@code sideOutputWithTimestamp} the element should not be
+     * modified in any way.
+     *
+     * <p>If invoked from {@link ProcessElement}), the timestamp
+     * must not be older than the input element's timestamp minus
+     * {@link OldDoFn#getAllowedTimestampSkew}.  The output element will
+     * be in the same windows as the input element.
+     *
+     * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
+     * this will attempt to use the
+     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+     * of the input {@code PCollection} to determine what windows the element
+     * should be in, throwing an exception if the {@code WindowFn} attempts
+     * to access any information about the input element except for the
+     * timestamp.
+     *
+     * @see ParDo#withOutputTags
+     */
+    public abstract <T> void sideOutputWithTimestamp(
+        TupleTag<T> tag, T output, Instant timestamp);
+  }
+
+  /**
+   * Information accessible when running {@link OldDoFn#processElement}.
+   */
+  public abstract class ProcessContext extends Context {
+
+    /**
+     * Returns the input element to be processed.
+     *
+     * <p>The element will not be changed -- it is safe to cache, etc.
+     * without copying.
+     */
+    public abstract InputT element();
+
+
+    /**
+     * Returns the value of the side input.
+     *
+     * @throws IllegalArgumentException if this is not a side input
+     * @see ParDo#withSideInputs
+     */
+    public abstract <T> T sideInput(PCollectionView<T> view);
+
+    /**
+     * Returns the timestamp of the input element.
+     *
+     * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
+     * for more information.
+     */
+    public abstract Instant timestamp();
+
+    /**
+     * Returns information about the pane within this window into which the
+     * input element has been assigned.
+     *
+     * <p>Generally all data is in a single, uninteresting pane unless custom
+     * triggering and/or late data has been explicitly requested.
+     * See {@link org.apache.beam.sdk.transforms.windowing.Window}
+     * for more information.
+     */
+    public abstract PaneInfo pane();
+  }
+
+  /**
+   * Returns the allowed timestamp skew duration, which is the maximum
+   * duration that timestamps can be shifted backward in
+   * {@link DoFn.Context#outputWithTimestamp}.
+   *
+   * <p>The default value is {@code Duration.ZERO}, in which case
+   * timestamps can only be shifted forward to future.  For infinite
+   * skew, return {@code Duration.millis(Long.MAX_VALUE)}.
+   */
+  public Duration getAllowedTimestampSkew() {
+    return Duration.ZERO;
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  Map<String, DelegatingAggregator<?, ?>> aggregators = new HashMap<>();
+
+  /**
+   * Protects aggregators from being created after initialization.
+   */
+  private boolean aggregatorsAreFinal;
+
+  /**
+   * Returns a {@link TypeDescriptor} capturing what is known statically
+   * about the input type of this {@code DoFn} instance's most-derived
+   * class.
+   *
+   * <p>See {@link #getOutputTypeDescriptor} for more discussion.
+   */
+  protected TypeDescriptor<InputT> getInputTypeDescriptor() {
+    return new TypeDescriptor<InputT>(getClass()) {};
+  }
+
+  /**
+   * Returns a {@link TypeDescriptor} capturing what is known statically
+   * about the output type of this {@code DoFn} instance's
+   * most-derived class.
+   *
+   * <p>In the normal case of a concrete {@code DoFn} subclass with
+   * no generic type parameters of its own (including anonymous inner
+   * classes), this will be a complete non-generic type, which is good
+   * for choosing a default output {@code Coder<O>} for the output
+   * {@code PCollection<O>}.
+   */
+  protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+    return new TypeDescriptor<OutputT>(getClass()) {};
+  }
+
+  /**
+   * Interface for runner implementors to provide implementations of extra context information.
+   *
+   * <p>The methods on this interface are called by {@link DoFnReflector} before invoking an
+   * annotated {@link StartBundle}, {@link ProcessElement} or {@link FinishBundle} method that
+   * has indicated it needs the given extra context.
+   *
+   * <p>In the case of {@link ProcessElement} it is called once per invocation of
+   * {@link ProcessElement}.
+   */
+  public interface ExtraContextFactory<InputT, OutputT> {
+    /**
+     * Construct the {@link BoundedWindow} to use within a {@link DoFn} that
+     * needs it. This is called if the {@link ProcessElement} method has a parameter of type
+     * {@link BoundedWindow}.
+     *
+     * @return {@link BoundedWindow} of the element currently being processed.
+     */
+    BoundedWindow window();
+
+    /**
+     * Construct the {@link WindowingInternals} to use within a {@link DoFn} that
+     * needs it. This is called if the {@link ProcessElement} method has a parameter of type
+     * {@link WindowingInternals}.
+     */
+    WindowingInternals<InputT, OutputT> windowingInternals();
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Annotation for the method to use to prepare an instance for processing a batch of elements.
+   * The method annotated with this must satisfy the following constraints:
+   * <ul>
+   *   <li>It must have at least one argument.
+   *   <li>Its first (and only) argument must be a {@link DoFn.Context}.
+   * </ul>
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  public @interface StartBundle {}
+
+  /**
+   * Annotation for the method to use for processing elements. A subclass of
+   * {@link DoFn} must have a method with this annotation satisfying
+   * the following constraints in order for it to be executable:
+   * <ul>
+   *   <li>It must have at least one argument.
+   *   <li>Its first argument must be a {@link DoFn.ProcessContext}.
+   *   <li>Its remaining arguments must be {@link BoundedWindow}, or
+   *   {@link WindowingInternals WindowingInternals&lt;InputT, OutputT&gt;}.
+   * </ul>
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  public @interface ProcessElement {}
+
+  /**
+   * Annotation for the method to use to prepare an instance for processing a batch of elements.
+   * The method annotated with this must satisfy the following constraints:
+   * <ul>
+   *   <li>It must have at least one argument.
+   *   <li>Its first (and only) argument must be a {@link DoFn.Context}.
+   * </ul>
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  public @interface FinishBundle {}
+
+  /**
+   * Returns an {@link Aggregator} with aggregation logic specified by the
+   * {@link CombineFn} argument. The name provided must be unique across
+   * {@link Aggregator}s created within the OldDoFn. Aggregators can only be created
+   * during pipeline construction.
+   *
+   * @param name the name of the aggregator
+   * @param combiner the {@link CombineFn} to use in the aggregator
+   * @return an aggregator for the provided name and combiner in the scope of
+   *         this OldDoFn
+   * @throws NullPointerException if the name or combiner is null
+   * @throws IllegalArgumentException if the given name collides with another
+   *         aggregator in this scope
+   * @throws IllegalStateException if called during pipeline execution.
+   */
+  public final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+      createAggregator(String name, Combine.CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
+    checkNotNull(name, "name cannot be null");
+    checkNotNull(combiner, "combiner cannot be null");
+    checkArgument(!aggregators.containsKey(name),
+        "Cannot create aggregator with name %s."
+        + " An Aggregator with that name already exists within this scope.",
+        name);
+    checkState(!aggregatorsAreFinal,
+        "Cannot create an aggregator during pipeline execution."
+        + " Aggregators should be registered during pipeline construction.");
+
+    DelegatingAggregator<AggInputT, AggOutputT> aggregator =
+        new DelegatingAggregator<>(name, combiner);
+    aggregators.put(name, aggregator);
+    return aggregator;
+  }
+
+  /**
+   * Returns an {@link Aggregator} with the aggregation logic specified by the
+   * {@link SerializableFunction} argument. The name provided must be unique
+   * across {@link Aggregator}s created within the OldDoFn. Aggregators can only be
+   * created during pipeline construction.
+   *
+   * @param name the name of the aggregator
+   * @param combiner the {@link SerializableFunction} to use in the aggregator
+   * @return an aggregator for the provided name and combiner in the scope of
+   *         this OldDoFn
+   * @throws NullPointerException if the name or combiner is null
+   * @throws IllegalArgumentException if the given name collides with another
+   *         aggregator in this scope
+   * @throws IllegalStateException if called during pipeline execution.
+   */
+  public final <AggInputT> Aggregator<AggInputT, AggInputT> createAggregator(
+      String name, SerializableFunction<Iterable<AggInputT>, AggInputT> combiner) {
+    checkNotNull(combiner, "combiner cannot be null.");
+    return createAggregator(name, Combine.IterableCombineFn.of(combiner));
+  }
+
+  /**
+   * Finalize the {@link DoFn} construction to prepare for processing.
+   * This method should be called by runners before any processing methods.
+   */
+  public void prepareForProcessing() {
+    aggregatorsAreFinal = true;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * <p>By default, does not register any display data. Implementors may override this method
+   * to provide their own display data.
+   */
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
index d8d4181..b504cb4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
@@ -18,10 +18,10 @@
 package org.apache.beam.sdk.transforms;
 
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFnWithContext.ExtraContextFactory;
-import org.apache.beam.sdk.transforms.DoFnWithContext.FinishBundle;
-import org.apache.beam.sdk.transforms.DoFnWithContext.ProcessElement;
-import org.apache.beam.sdk.transforms.DoFnWithContext.StartBundle;
+import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
+import org.apache.beam.sdk.transforms.DoFn.FinishBundle;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.DoFn.StartBundle;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -94,7 +94,7 @@ import javax.annotation.Nullable;
 
 
 /**
- * Utility implementing the necessary reflection for working with {@link DoFnWithContext}s.
+ * Utility implementing the necessary reflection for working with {@link DoFn}s.
  */
 public abstract class DoFnReflector {
 
@@ -109,7 +109,7 @@ public abstract class DoFnReflector {
 
   /**
    * Enumeration of the parameters available from the {@link ExtraContextFactory} to use as
-   * additional parameters for {@link DoFnWithContext} methods.
+   * additional parameters for {@link DoFn} methods.
    * <p>
    * We don't rely on looking for properly annotated methods within {@link ExtraContextFactory}
    * because erasure would make it impossible to completely fill in the type token for context
@@ -139,7 +139,7 @@ public abstract class DoFnReflector {
 
     /**
      * Create a type token representing the given parameter. May use the type token associated
-     * with the input and output types of the {@link DoFnWithContext}, depending on the extra
+     * with the input and output types of the {@link DoFn}, depending on the extra
      * context.
      */
     abstract <InputT, OutputT> TypeToken<?> tokenFor(
@@ -190,22 +190,22 @@ public abstract class DoFnReflector {
   }
 
   /**
-   * @return true if the reflected {@link DoFnWithContext} uses a Single Window.
+   * @return true if the reflected {@link DoFn} uses a Single Window.
    */
   public abstract boolean usesSingleWindow();
 
   /** Create an {@link DoFnInvoker} bound to the given {@link OldDoFn}. */
   public abstract <InputT, OutputT> DoFnInvoker<InputT, OutputT> bindInvoker(
-      DoFnWithContext<InputT, OutputT> fn);
+      DoFn<InputT, OutputT> fn);
 
   private static final Map<Class<?>, DoFnReflector> REFLECTOR_CACHE =
       new LinkedHashMap<Class<?>, DoFnReflector>();
 
   /**
-   * @return the {@link DoFnReflector} for the given {@link DoFnWithContext}.
+   * @return the {@link DoFnReflector} for the given {@link DoFn}.
    */
   public static DoFnReflector of(
-      @SuppressWarnings("rawtypes") Class<? extends DoFnWithContext> fn) {
+      @SuppressWarnings("rawtypes") Class<? extends DoFn> fn) {
     DoFnReflector reflector = REFLECTOR_CACHE.get(fn);
     if (reflector != null) {
       return reflector;
@@ -217,9 +217,9 @@ public abstract class DoFnReflector {
   }
 
   /**
-   * Create a {@link OldDoFn} that the {@link DoFnWithContext}.
+   * Create a {@link OldDoFn} that the {@link DoFn}.
    */
-  public <InputT, OutputT> OldDoFn<InputT, OutputT> toDoFn(DoFnWithContext<InputT, OutputT> fn) {
+  public <InputT, OutputT> OldDoFn<InputT, OutputT> toDoFn(DoFn<InputT, OutputT> fn) {
     if (usesSingleWindow()) {
       return new WindowDoFnAdapter<InputT, OutputT>(this, fn);
     } else {
@@ -259,7 +259,7 @@ public abstract class DoFnReflector {
   static <InputT, OutputT> List<AdditionalParameter> verifyProcessMethodArguments(Method m) {
     return verifyMethodArguments(m,
         EXTRA_PROCESS_CONTEXTS,
-        new TypeToken<DoFnWithContext<InputT, OutputT>.ProcessContext>() {},
+        new TypeToken<DoFn<InputT, OutputT>.ProcessContext>() {},
         new TypeParameter<InputT>() {},
         new TypeParameter<OutputT>() {});
   }
@@ -271,13 +271,13 @@ public abstract class DoFnReflector {
     }
     return verifyMethodArguments(m,
         EXTRA_CONTEXTS,
-        new TypeToken<DoFnWithContext<InputT, OutputT>.Context>() {},
+        new TypeToken<DoFn<InputT, OutputT>.Context>() {},
         new TypeParameter<InputT>() {},
         new TypeParameter<OutputT>() {});
   }
 
   /**
-   * Verify the method arguments for a given {@link DoFnWithContext} method.
+   * Verify the method arguments for a given {@link DoFn} method.
    *
    * <p>The requirements for a method to be valid, are:
    * <ol>
@@ -330,7 +330,7 @@ public abstract class DoFnReflector {
     // Fill in the generics in the allExtraContextArgs interface from the types in the
     // Context or ProcessContext OldDoFn.
     ParameterizedType pt = (ParameterizedType) contextToken.getType();
-    // We actually want the owner, since ProcessContext and Context are owned by DoFnWithContext.
+    // We actually want the owner, since ProcessContext and Context are owned by DoFn.
     pt = (ParameterizedType) pt.getOwnerType();
     @SuppressWarnings("unchecked")
     TypeToken<InputT> iActual = (TypeToken<InputT>) TypeToken.of(pt.getActualTypeArguments()[0]);
@@ -368,21 +368,21 @@ public abstract class DoFnReflector {
   public interface DoFnInvoker<InputT, OutputT>  {
     /** Invoke {@link OldDoFn#startBundle} on the bound {@code OldDoFn}. */
     void invokeStartBundle(
-        DoFnWithContext<InputT, OutputT>.Context c,
+        DoFn<InputT, OutputT>.Context c,
         ExtraContextFactory<InputT, OutputT> extra);
     /** Invoke {@link OldDoFn#finishBundle} on the bound {@code OldDoFn}. */
     void invokeFinishBundle(
-        DoFnWithContext<InputT, OutputT>.Context c,
+        DoFn<InputT, OutputT>.Context c,
         ExtraContextFactory<InputT, OutputT> extra);
 
     /** Invoke {@link OldDoFn#processElement} on the bound {@code OldDoFn}. */
     public void invokeProcessElement(
-        DoFnWithContext<InputT, OutputT>.ProcessContext c,
+        DoFn<InputT, OutputT>.ProcessContext c,
         ExtraContextFactory<InputT, OutputT> extra);
   }
 
   /**
-   * Implementation of {@link DoFnReflector} for the arbitrary {@link DoFnWithContext}.
+   * Implementation of {@link DoFnReflector} for the arbitrary {@link DoFn}.
    */
   private static class GenericDoFnReflector extends DoFnReflector {
 
@@ -395,7 +395,7 @@ public abstract class DoFnReflector {
     private final Constructor<?> constructor;
 
     private GenericDoFnReflector(
-        @SuppressWarnings("rawtypes") Class<? extends DoFnWithContext> fn) {
+        @SuppressWarnings("rawtypes") Class<? extends DoFn> fn) {
       // Locate the annotated methods
       this.processElement = findAnnotatedMethod(ProcessElement.class, fn, true);
       this.startBundle = findAnnotatedMethod(StartBundle.class, fn, false);
@@ -442,7 +442,7 @@ public abstract class DoFnReflector {
     private static Method findAnnotatedMethod(
         Class<? extends Annotation> anno, Class<?> fnClazz, boolean required) {
       Collection<Method> matches = declaredMethodsWithAnnotation(
-          anno, fnClazz, DoFnWithContext.class);
+          anno, fnClazz, DoFn.class);
 
       if (matches.size() == 0) {
         if (required == true) {
@@ -493,12 +493,12 @@ public abstract class DoFnReflector {
 
     /**
      * Use ByteBuddy to generate the code for a {@link DoFnInvoker} that invokes the given
-     * {@link DoFnWithContext}.
+     * {@link DoFn}.
      * @param clazz
      * @return
      */
     private Constructor<? extends DoFnInvoker<?, ?>> createInvokerConstructor(
-        @SuppressWarnings("rawtypes") Class<? extends DoFnWithContext> clazz) {
+        @SuppressWarnings("rawtypes") Class<? extends DoFn> clazz) {
 
       final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(clazz);
 
@@ -545,7 +545,7 @@ public abstract class DoFnReflector {
 
     @Override
     public <InputT, OutputT> DoFnInvoker<InputT, OutputT> bindInvoker(
-        DoFnWithContext<InputT, OutputT> fn) {
+        DoFn<InputT, OutputT> fn) {
       try {
         @SuppressWarnings("unchecked")
         DoFnInvoker<InputT, OutputT> invoker =
@@ -562,13 +562,13 @@ public abstract class DoFnReflector {
   }
 
   private static class ContextAdapter<InputT, OutputT>
-      extends DoFnWithContext<InputT, OutputT>.Context
-      implements DoFnWithContext.ExtraContextFactory<InputT, OutputT> {
+      extends DoFn<InputT, OutputT>.Context
+      implements DoFn.ExtraContextFactory<InputT, OutputT> {
 
     private OldDoFn<InputT, OutputT>.Context context;
 
     private ContextAdapter(
-        DoFnWithContext<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) {
+        DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) {
       fn.super();
       this.context = context;
     }
@@ -600,14 +600,14 @@ public abstract class DoFnReflector {
 
     @Override
     public BoundedWindow window() {
-      // The DoFnWithContext doesn't allow us to ask for these outside ProcessElements, so this
+      // The DoFn doesn't allow us to ask for these outside ProcessElements, so this
       // should be unreachable.
       throw new UnsupportedOperationException("Can only get the window in ProcessElements");
     }
 
     @Override
     public WindowingInternals<InputT, OutputT> windowingInternals() {
-      // The DoFnWithContext doesn't allow us to ask for these outside ProcessElements, so this
+      // The DoFn doesn't allow us to ask for these outside ProcessElements, so this
       // should be unreachable.
       throw new UnsupportedOperationException(
           "Can only get the windowingInternals in ProcessElements");
@@ -615,13 +615,13 @@ public abstract class DoFnReflector {
   }
 
   private static class ProcessContextAdapter<InputT, OutputT>
-      extends DoFnWithContext<InputT, OutputT>.ProcessContext
-      implements DoFnWithContext.ExtraContextFactory<InputT, OutputT> {
+      extends DoFn<InputT, OutputT>.ProcessContext
+      implements DoFn.ExtraContextFactory<InputT, OutputT> {
 
     private OldDoFn<InputT, OutputT>.ProcessContext context;
 
     private ProcessContextAdapter(
-        DoFnWithContext<InputT, OutputT> fn,
+        DoFn<InputT, OutputT> fn,
         OldDoFn<InputT, OutputT>.ProcessContext context) {
       fn.super();
       this.context = context;
@@ -693,10 +693,10 @@ public abstract class DoFnReflector {
 
   private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
 
-    private final DoFnWithContext<InputT, OutputT> fn;
+    private final DoFn<InputT, OutputT> fn;
     private transient DoFnInvoker<InputT, OutputT> invoker;
 
-    private SimpleDoFnAdapter(DoFnReflector reflector, DoFnWithContext<InputT, OutputT> fn) {
+    private SimpleDoFnAdapter(DoFnReflector reflector, DoFn<InputT, OutputT> fn) {
       super(fn.aggregators);
       this.fn = fn;
       this.invoker = reflector.bindInvoker(fn);
@@ -745,7 +745,7 @@ public abstract class DoFnReflector {
   private static class WindowDoFnAdapter<InputT, OutputT>
   extends SimpleDoFnAdapter<InputT, OutputT> implements OldDoFn.RequiresWindowAccess {
 
-    private WindowDoFnAdapter(DoFnReflector reflector, DoFnWithContext<InputT, OutputT> fn) {
+    private WindowDoFnAdapter(DoFnReflector reflector, DoFn<InputT, OutputT> fn) {
       super(reflector, fn);
     }
   }
@@ -770,7 +770,7 @@ public abstract class DoFnReflector {
         try {
           prepareMethod = new MethodLocator.ForExplicitMethod(
               new MethodDescription.ForLoadedMethod(
-                  DoFnWithContext.class.getDeclaredMethod("prepareForProcessing")))
+                  DoFn.class.getDeclaredMethod("prepareForProcessing")))
           .resolve(instrumentedMethod);
         } catch (NoSuchMethodException | SecurityException e) {
           throw new RuntimeException("Unable to locate prepareForProcessing method", e);
@@ -817,7 +817,7 @@ public abstract class DoFnReflector {
 
   /**
    * A byte-buddy {@link Implementation} that delegates a call that receives
-   * {@link AdditionalParameter} to the given {@link DoFnWithContext} method.
+   * {@link AdditionalParameter} to the given {@link DoFn} method.
    */
   private static final class InvokerDelegation implements Implementation {
     @Nullable
@@ -845,7 +845,7 @@ public abstract class DoFnReflector {
 
     /**
      * Generate the {@link Implementation} of one of the life-cycle methods of a
-     * {@link DoFnWithContext}.
+     * {@link DoFn}.
      */
     private static Implementation create(
         @Nullable final Method target, BeforeDelegation before, List<AdditionalParameter> args) {
@@ -869,7 +869,7 @@ public abstract class DoFnReflector {
     }
 
     /**
-     * Stack manipulation to push the {@link DoFnWithContext} reference stored in the
+     * Stack manipulation to push the {@link DoFn} reference stored in the
      * delegate field of the invoker on to the top of the stack.
      *
      * <p>This implementation is derived from the code for
@@ -1018,7 +1018,7 @@ public abstract class DoFnReflector {
   /**
    * A constructor {@link Implementation} for a {@link DoFnInvoker class}. Produces the byte code
    * for a constructor that takes a single argument and assigns it to the delegate field.
-   * {@link AdditionalParameter} to the given {@link DoFnWithContext} method.
+   * {@link AdditionalParameter} to the given {@link DoFn} method.
    */
   private static final class InvokerConstructor implements Implementation {
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 9336e4c..f44a9ae 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -90,7 +90,7 @@ public class DoFnTester<InputT, OutputT> {
    */
   @SuppressWarnings("unchecked")
   public static <InputT, OutputT> DoFnTester<InputT, OutputT>
-      of(DoFnWithContext<InputT, OutputT> fn) {
+      of(DoFn<InputT, OutputT> fn) {
     return new DoFnTester<InputT, OutputT>(DoFnReflector.of(fn.getClass()).toDoFn(fn));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java
deleted file mode 100644
index b27163a..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java
+++ /dev/null
@@ -1,429 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowingInternals;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * The argument to {@link ParDo} providing the code to use to process
- * elements of the input
- * {@link org.apache.beam.sdk.values.PCollection}.
- *
- * <p>See {@link ParDo} for more explanation, examples of use, and
- * discussion of constraints on {@code DoFnWithContext}s, including their
- * serializability, lack of access to global shared mutable state,
- * requirements for failure tolerance, and benefits of optimization.
- *
- * <p>{@code DoFnWithContext}s can be tested in a particular
- * {@code Pipeline} by running that {@code Pipeline} on sample input
- * and then checking its output.  Unit testing of a {@code DoFnWithContext},
- * separately from any {@code ParDo} transform or {@code Pipeline},
- * can be done via the {@link DoFnTester} harness.
- *
- * <p>Implementations must define a method annotated with {@link ProcessElement}
- * that satisfies the requirements described there. See the {@link ProcessElement}
- * for details.
- *
- * <p>This functionality is experimental and likely to change.
- *
- * <p>Example usage:
- *
- * <pre> {@code
- * PCollection<String> lines = ... ;
- * PCollection<String> words =
- *     lines.apply(ParDo.of(new DoFnWithContext<String, String>() {
- *         @ProcessElement
- *         public void processElement(ProcessContext c, BoundedWindow window) {
- *
- *         }}));
- * } </pre>
- *
- * @param <InputT> the type of the (main) input elements
- * @param <OutputT> the type of the (main) output elements
- */
-@Experimental
-public abstract class DoFnWithContext<InputT, OutputT> implements Serializable, HasDisplayData {
-
-  /** Information accessible to all methods in this {@code DoFnWithContext}. */
-  public abstract class Context {
-
-    /**
-     * Returns the {@code PipelineOptions} specified with the
-     * {@link org.apache.beam.sdk.runners.PipelineRunner}
-     * invoking this {@code DoFnWithContext}.  The {@code PipelineOptions} will
-     * be the default running via {@link DoFnTester}.
-     */
-    public abstract PipelineOptions getPipelineOptions();
-
-    /**
-     * Adds the given element to the main output {@code PCollection}.
-     *
-     * <p>Once passed to {@code output} the element should not be modified in
-     * any way.
-     *
-     * <p>If invoked from {@link ProcessElement}, the output
-     * element will have the same timestamp and be in the same windows
-     * as the input element passed to the method annotated with
-     * {@code @ProcessElement}.
-     *
-     * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
-     * this will attempt to use the
-     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
-     * of the input {@code PCollection} to determine what windows the element
-     * should be in, throwing an exception if the {@code WindowFn} attempts
-     * to access any information about the input element. The output element
-     * will have a timestamp of negative infinity.
-     */
-    public abstract void output(OutputT output);
-
-    /**
-     * Adds the given element to the main output {@code PCollection},
-     * with the given timestamp.
-     *
-     * <p>Once passed to {@code outputWithTimestamp} the element should not be
-     * modified in any way.
-     *
-     * <p>If invoked from {@link ProcessElement}), the timestamp
-     * must not be older than the input element's timestamp minus
-     * {@link OldDoFn#getAllowedTimestampSkew}.  The output element will
-     * be in the same windows as the input element.
-     *
-     * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
-     * this will attempt to use the
-     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
-     * of the input {@code PCollection} to determine what windows the element
-     * should be in, throwing an exception if the {@code WindowFn} attempts
-     * to access any information about the input element except for the
-     * timestamp.
-     */
-    public abstract void outputWithTimestamp(OutputT output, Instant timestamp);
-
-    /**
-     * Adds the given element to the side output {@code PCollection} with the
-     * given tag.
-     *
-     * <p>Once passed to {@code sideOutput} the element should not be modified
-     * in any way.
-     *
-     * <p>The caller of {@code ParDo} uses {@link ParDo#withOutputTags} to
-     * specify the tags of side outputs that it consumes. Non-consumed side
-     * outputs, e.g., outputs for monitoring purposes only, don't necessarily
-     * need to be specified.
-     *
-     * <p>The output element will have the same timestamp and be in the same
-     * windows as the input element passed to {@link ProcessElement}).
-     *
-     * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
-     * this will attempt to use the
-     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
-     * of the input {@code PCollection} to determine what windows the element
-     * should be in, throwing an exception if the {@code WindowFn} attempts
-     * to access any information about the input element. The output element
-     * will have a timestamp of negative infinity.
-     *
-     * @see ParDo#withOutputTags
-     */
-    public abstract <T> void sideOutput(TupleTag<T> tag, T output);
-
-    /**
-     * Adds the given element to the specified side output {@code PCollection},
-     * with the given timestamp.
-     *
-     * <p>Once passed to {@code sideOutputWithTimestamp} the element should not be
-     * modified in any way.
-     *
-     * <p>If invoked from {@link ProcessElement}), the timestamp
-     * must not be older than the input element's timestamp minus
-     * {@link OldDoFn#getAllowedTimestampSkew}.  The output element will
-     * be in the same windows as the input element.
-     *
-     * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
-     * this will attempt to use the
-     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
-     * of the input {@code PCollection} to determine what windows the element
-     * should be in, throwing an exception if the {@code WindowFn} attempts
-     * to access any information about the input element except for the
-     * timestamp.
-     *
-     * @see ParDo#withOutputTags
-     */
-    public abstract <T> void sideOutputWithTimestamp(
-        TupleTag<T> tag, T output, Instant timestamp);
-  }
-
-  /**
-   * Information accessible when running {@link OldDoFn#processElement}.
-   */
-  public abstract class ProcessContext extends Context {
-
-    /**
-     * Returns the input element to be processed.
-     *
-     * <p>The element will not be changed -- it is safe to cache, etc.
-     * without copying.
-     */
-    public abstract InputT element();
-
-
-    /**
-     * Returns the value of the side input.
-     *
-     * @throws IllegalArgumentException if this is not a side input
-     * @see ParDo#withSideInputs
-     */
-    public abstract <T> T sideInput(PCollectionView<T> view);
-
-    /**
-     * Returns the timestamp of the input element.
-     *
-     * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
-     * for more information.
-     */
-    public abstract Instant timestamp();
-
-    /**
-     * Returns information about the pane within this window into which the
-     * input element has been assigned.
-     *
-     * <p>Generally all data is in a single, uninteresting pane unless custom
-     * triggering and/or late data has been explicitly requested.
-     * See {@link org.apache.beam.sdk.transforms.windowing.Window}
-     * for more information.
-     */
-    public abstract PaneInfo pane();
-  }
-
-  /**
-   * Returns the allowed timestamp skew duration, which is the maximum
-   * duration that timestamps can be shifted backward in
-   * {@link DoFnWithContext.Context#outputWithTimestamp}.
-   *
-   * <p>The default value is {@code Duration.ZERO}, in which case
-   * timestamps can only be shifted forward to future.  For infinite
-   * skew, return {@code Duration.millis(Long.MAX_VALUE)}.
-   */
-  public Duration getAllowedTimestampSkew() {
-    return Duration.ZERO;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  Map<String, DelegatingAggregator<?, ?>> aggregators = new HashMap<>();
-
-  /**
-   * Protects aggregators from being created after initialization.
-   */
-  private boolean aggregatorsAreFinal;
-
-  /**
-   * Returns a {@link TypeDescriptor} capturing what is known statically
-   * about the input type of this {@code DoFnWithContext} instance's most-derived
-   * class.
-   *
-   * <p>See {@link #getOutputTypeDescriptor} for more discussion.
-   */
-  protected TypeDescriptor<InputT> getInputTypeDescriptor() {
-    return new TypeDescriptor<InputT>(getClass()) {};
-  }
-
-  /**
-   * Returns a {@link TypeDescriptor} capturing what is known statically
-   * about the output type of this {@code DoFnWithContext} instance's
-   * most-derived class.
-   *
-   * <p>In the normal case of a concrete {@code DoFnWithContext} subclass with
-   * no generic type parameters of its own (including anonymous inner
-   * classes), this will be a complete non-generic type, which is good
-   * for choosing a default output {@code Coder<O>} for the output
-   * {@code PCollection<O>}.
-   */
-  protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
-    return new TypeDescriptor<OutputT>(getClass()) {};
-  }
-
-  /**
-   * Interface for runner implementors to provide implementations of extra context information.
-   *
-   * <p>The methods on this interface are called by {@link DoFnReflector} before invoking an
-   * annotated {@link StartBundle}, {@link ProcessElement} or {@link FinishBundle} method that
-   * has indicated it needs the given extra context.
-   *
-   * <p>In the case of {@link ProcessElement} it is called once per invocation of
-   * {@link ProcessElement}.
-   */
-  public interface ExtraContextFactory<InputT, OutputT> {
-    /**
-     * Construct the {@link BoundedWindow} to use within a {@link DoFnWithContext} that
-     * needs it. This is called if the {@link ProcessElement} method has a parameter of type
-     * {@link BoundedWindow}.
-     *
-     * @return {@link BoundedWindow} of the element currently being processed.
-     */
-    BoundedWindow window();
-
-    /**
-     * Construct the {@link WindowingInternals} to use within a {@link DoFnWithContext} that
-     * needs it. This is called if the {@link ProcessElement} method has a parameter of type
-     * {@link WindowingInternals}.
-     */
-    WindowingInternals<InputT, OutputT> windowingInternals();
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Annotation for the method to use to prepare an instance for processing a batch of elements.
-   * The method annotated with this must satisfy the following constraints:
-   * <ul>
-   *   <li>It must have at least one argument.
-   *   <li>Its first (and only) argument must be a {@link DoFnWithContext.Context}.
-   * </ul>
-   */
-  @Documented
-  @Retention(RetentionPolicy.RUNTIME)
-  @Target(ElementType.METHOD)
-  public @interface StartBundle {}
-
-  /**
-   * Annotation for the method to use for processing elements. A subclass of
-   * {@link DoFnWithContext} must have a method with this annotation satisfying
-   * the following constraints in order for it to be executable:
-   * <ul>
-   *   <li>It must have at least one argument.
-   *   <li>Its first argument must be a {@link DoFnWithContext.ProcessContext}.
-   *   <li>Its remaining arguments must be {@link BoundedWindow}, or
-   *   {@link WindowingInternals WindowingInternals&lt;InputT, OutputT&gt;}.
-   * </ul>
-   */
-  @Documented
-  @Retention(RetentionPolicy.RUNTIME)
-  @Target(ElementType.METHOD)
-  public @interface ProcessElement {}
-
-  /**
-   * Annotation for the method to use to prepare an instance for processing a batch of elements.
-   * The method annotated with this must satisfy the following constraints:
-   * <ul>
-   *   <li>It must have at least one argument.
-   *   <li>Its first (and only) argument must be a {@link DoFnWithContext.Context}.
-   * </ul>
-   */
-  @Documented
-  @Retention(RetentionPolicy.RUNTIME)
-  @Target(ElementType.METHOD)
-  public @interface FinishBundle {}
-
-  /**
-   * Returns an {@link Aggregator} with aggregation logic specified by the
-   * {@link CombineFn} argument. The name provided must be unique across
-   * {@link Aggregator}s created within the OldDoFn. Aggregators can only be created
-   * during pipeline construction.
-   *
-   * @param name the name of the aggregator
-   * @param combiner the {@link CombineFn} to use in the aggregator
-   * @return an aggregator for the provided name and combiner in the scope of
-   *         this OldDoFn
-   * @throws NullPointerException if the name or combiner is null
-   * @throws IllegalArgumentException if the given name collides with another
-   *         aggregator in this scope
-   * @throws IllegalStateException if called during pipeline execution.
-   */
-  public final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
-      createAggregator(String name, Combine.CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
-    checkNotNull(name, "name cannot be null");
-    checkNotNull(combiner, "combiner cannot be null");
-    checkArgument(!aggregators.containsKey(name),
-        "Cannot create aggregator with name %s."
-        + " An Aggregator with that name already exists within this scope.",
-        name);
-    checkState(!aggregatorsAreFinal,
-        "Cannot create an aggregator during pipeline execution."
-        + " Aggregators should be registered during pipeline construction.");
-
-    DelegatingAggregator<AggInputT, AggOutputT> aggregator =
-        new DelegatingAggregator<>(name, combiner);
-    aggregators.put(name, aggregator);
-    return aggregator;
-  }
-
-  /**
-   * Returns an {@link Aggregator} with the aggregation logic specified by the
-   * {@link SerializableFunction} argument. The name provided must be unique
-   * across {@link Aggregator}s created within the OldDoFn. Aggregators can only be
-   * created during pipeline construction.
-   *
-   * @param name the name of the aggregator
-   * @param combiner the {@link SerializableFunction} to use in the aggregator
-   * @return an aggregator for the provided name and combiner in the scope of
-   *         this OldDoFn
-   * @throws NullPointerException if the name or combiner is null
-   * @throws IllegalArgumentException if the given name collides with another
-   *         aggregator in this scope
-   * @throws IllegalStateException if called during pipeline execution.
-   */
-  public final <AggInputT> Aggregator<AggInputT, AggInputT> createAggregator(
-      String name, SerializableFunction<Iterable<AggInputT>, AggInputT> combiner) {
-    checkNotNull(combiner, "combiner cannot be null.");
-    return createAggregator(name, Combine.IterableCombineFn.of(combiner));
-  }
-
-  /**
-   * Finalize the {@link DoFnWithContext} construction to prepare for processing.
-   * This method should be called by runners before any processing methods.
-   */
-  void prepareForProcessing() {
-    aggregatorsAreFinal = true;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * <p>By default, does not register any display data. Implementors may override this method
-   * to provide their own display data.
-   */
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index 48c6033..f640442 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -63,7 +63,7 @@ import java.util.UUID;
  * separately from any {@code ParDo} transform or {@code Pipeline},
  * can be done via the {@link DoFnTester} harness.
  *
- * <p>{@link DoFnWithContext} (currently experimental) offers an alternative
+ * <p>{@link DoFn} (currently experimental) offers an alternative
  * mechanism for accessing {@link ProcessContext#window()} without the need
  * to implement {@link RequiresWindowAccess}.
  *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 36d8101..bb1af9c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -527,13 +527,13 @@ public class ParDo {
   }
 
   private static <InputT, OutputT> OldDoFn<InputT, OutputT>
-      adapt(DoFnWithContext<InputT, OutputT> fn) {
+      adapt(DoFn<InputT, OutputT> fn) {
     return DoFnReflector.of(fn.getClass()).toDoFn(fn);
   }
 
   /**
    * Creates a {@link ParDo} {@link PTransform} that will invoke the
-   * given {@link DoFnWithContext} function.
+   * given {@link DoFn} function.
    *
    * <p>The resulting {@link PTransform PTransform's} types have been bound, with the
    * input being a {@code PCollection<InputT>} and the output a
@@ -541,11 +541,11 @@ public class ParDo {
    * {@code OldDoFn<InputT, OutputT>}. It is ready to be applied, or further
    * properties can be set on it first.
    *
-   * <p>{@link DoFnWithContext} is an experimental alternative to
+   * <p>{@link DoFn} is an experimental alternative to
    * {@link OldDoFn} which simplifies accessing the window of the element.
    */
   @Experimental
-  public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFnWithContext<InputT, OutputT> fn) {
+  public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
     return of(adapt(fn), fn.getClass());
   }
 
@@ -633,13 +633,13 @@ public class ParDo {
 
     /**
      * Returns a new {@link ParDo} {@link PTransform} that's like this
-     * transform but which will invoke the given {@link DoFnWithContext}
+     * transform but which will invoke the given {@link DoFn}
      * function, and which has its input and output types bound. Does
      * not modify this transform. The resulting {@link PTransform} is
      * sufficiently specified to be applied, but more properties can
      * still be specified.
      */
-    public <InputT, OutputT> Bound<InputT, OutputT> of(DoFnWithContext<InputT, OutputT> fn) {
+    public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
       return of(adapt(fn), fn.getClass());
     }
   }
@@ -845,12 +845,12 @@ public class ParDo {
     /**
      * Returns a new multi-output {@link ParDo} {@link PTransform}
      * that's like this transform but which will invoke the given
-     * {@link DoFnWithContext} function, and which has its input type bound.
+     * {@link DoFn} function, and which has its input type bound.
      * Does not modify this transform. The resulting
      * {@link PTransform} is sufficiently specified to be applied, but
      * more properties can still be specified.
      */
-    public <InputT> BoundMulti<InputT, OutputT> of(DoFnWithContext<InputT, OutputT> fn) {
+    public <InputT> BoundMulti<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
       return of(adapt(fn), fn.getClass());
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
index 0cb3d7b..df9e441 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
@@ -21,10 +21,10 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.beam.sdk.transforms.DoFnWithContext.Context;
-import org.apache.beam.sdk.transforms.DoFnWithContext.ExtraContextFactory;
-import org.apache.beam.sdk.transforms.DoFnWithContext.ProcessContext;
-import org.apache.beam.sdk.transforms.DoFnWithContext.ProcessElement;
+import org.apache.beam.sdk.transforms.DoFn.Context;
+import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
 import org.apache.beam.sdk.transforms.dofnreflector.DoFnReflectorTestHelper;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.UserCodeException;
@@ -61,13 +61,13 @@ public class DoFnReflectorTest {
     }
   }
 
-  private DoFnWithContext<String, String> fn;
+  private DoFn<String, String> fn;
 
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
   @Mock
-  private DoFnWithContext<String, String>.ProcessContext mockContext;
+  private DoFn<String, String>.ProcessContext mockContext;
   @Mock
   private BoundedWindow mockWindow;
   @Mock
@@ -91,7 +91,7 @@ public class DoFnReflectorTest {
     };
   }
 
-  private DoFnReflector underTest(DoFnWithContext<String, String> fn) {
+  private DoFnReflector underTest(DoFn<String, String> fn) {
     this.fn = fn;
     return DoFnReflector.of(fn.getClass());
   }
@@ -141,7 +141,7 @@ public class DoFnReflectorTest {
   @Test
   public void testDoFnWithNoExtraContext() throws Exception {
     final Invocations invocations = new Invocations("AnonymousClass");
-    DoFnReflector reflector = underTest(new DoFnWithContext<String, String>() {
+    DoFnReflector reflector = underTest(new DoFn<String, String>() {
 
       @ProcessElement
       public void processElement(ProcessContext c)
@@ -172,19 +172,19 @@ public class DoFnReflectorTest {
 
   interface InterfaceWithProcessElement {
     @ProcessElement
-    void processElement(DoFnWithContext<String, String>.ProcessContext c);
+    void processElement(DoFn<String, String>.ProcessContext c);
   }
 
   interface LayersOfInterfaces extends InterfaceWithProcessElement {}
 
   private class IdentityUsingInterfaceWithProcessElement
-      extends DoFnWithContext<String, String>
+      extends DoFn<String, String>
       implements LayersOfInterfaces {
 
     private Invocations invocations = new Invocations("Named Class");
 
     @Override
-    public void processElement(DoFnWithContext<String, String>.ProcessContext c) {
+    public void processElement(DoFn<String, String>.ProcessContext c) {
       invocations.wasProcessElementInvoked = true;
       assertSame(c, mockContext);
     }
@@ -198,7 +198,7 @@ public class DoFnReflectorTest {
     checkInvokeProcessElementWorks(reflector, fn.invocations);
   }
 
-  private class IdentityParent extends DoFnWithContext<String, String> {
+  private class IdentityParent extends DoFn<String, String> {
     protected Invocations parentInvocations = new Invocations("IdentityParent");
 
     @ProcessElement
@@ -215,7 +215,7 @@ public class DoFnReflectorTest {
     protected Invocations childInvocations = new Invocations("IdentityChildWithOverride");
 
     @Override
-    public void process(DoFnWithContext<String, String>.ProcessContext c) {
+    public void process(DoFn<String, String>.ProcessContext c) {
       super.process(c);
       childInvocations.wasProcessElementInvoked = true;
     }
@@ -240,7 +240,7 @@ public class DoFnReflectorTest {
   @Test
   public void testDoFnWithWindow() throws Exception {
     final Invocations invocations = new Invocations("AnonymousClass");
-    DoFnReflector reflector = underTest(new DoFnWithContext<String, String>() {
+    DoFnReflector reflector = underTest(new DoFn<String, String>() {
 
       @ProcessElement
       public void processElement(ProcessContext c, BoundedWindow w)
@@ -259,7 +259,7 @@ public class DoFnReflectorTest {
   @Test
   public void testDoFnWithWindowingInternals() throws Exception {
     final Invocations invocations = new Invocations("AnonymousClass");
-    DoFnReflector reflector = underTest(new DoFnWithContext<String, String>() {
+    DoFnReflector reflector = underTest(new DoFn<String, String>() {
 
       @ProcessElement
       public void processElement(ProcessContext c, WindowingInternals<String, String> w)
@@ -278,7 +278,7 @@ public class DoFnReflectorTest {
   @Test
   public void testDoFnWithStartBundle() throws Exception {
     final Invocations invocations = new Invocations("AnonymousClass");
-    DoFnReflector reflector = underTest(new DoFnWithContext<String, String>() {
+    DoFnReflector reflector = underTest(new DoFn<String, String>() {
       @ProcessElement
       public void processElement(@SuppressWarnings("unused") ProcessContext c) {}
 
@@ -304,7 +304,7 @@ public class DoFnReflectorTest {
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("No method annotated with @ProcessElement found");
     thrown.expectMessage(getClass().getName() + "$");
-    underTest(new DoFnWithContext<String, String>() {});
+    underTest(new DoFn<String, String>() {});
   }
 
   @Test
@@ -314,7 +314,7 @@ public class DoFnReflectorTest {
     thrown.expectMessage("foo()");
     thrown.expectMessage("bar()");
     thrown.expectMessage(getClass().getName() + "$");
-    underTest(new DoFnWithContext<String, String>() {
+    underTest(new DoFn<String, String>() {
       @ProcessElement
       public void foo() {}
 
@@ -330,7 +330,7 @@ public class DoFnReflectorTest {
     thrown.expectMessage("bar()");
     thrown.expectMessage("baz()");
     thrown.expectMessage(getClass().getName() + "$");
-    underTest(new DoFnWithContext<String, String>() {
+    underTest(new DoFn<String, String>() {
       @ProcessElement
       public void foo() {}
 
@@ -349,7 +349,7 @@ public class DoFnReflectorTest {
     thrown.expectMessage("bar()");
     thrown.expectMessage("baz()");
     thrown.expectMessage(getClass().getName() + "$");
-    underTest(new DoFnWithContext<String, String>() {
+    underTest(new DoFn<String, String>() {
       @ProcessElement
       public void foo() {}
 
@@ -361,7 +361,7 @@ public class DoFnReflectorTest {
     });
   }
 
-  private static class PrivateDoFnClass extends DoFnWithContext<String, String> {
+  private static class PrivateDoFnClass extends DoFn<String, String> {
     final Invocations invocations = new Invocations(getClass().getName());
 
     @ProcessElement
@@ -429,7 +429,7 @@ public class DoFnReflectorTest {
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("process() must be public");
     thrown.expectMessage(getClass().getName() + "$");
-    underTest(new DoFnWithContext<String, String>() {
+    underTest(new DoFn<String, String>() {
       @ProcessElement
       private void process() {}
     });
@@ -440,7 +440,7 @@ public class DoFnReflectorTest {
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("startBundle() must be public");
     thrown.expectMessage(getClass().getName() + "$");
-    underTest(new DoFnWithContext<String, String>() {
+    underTest(new DoFn<String, String>() {
       @ProcessElement
       public void processElement() {}
 
@@ -454,7 +454,7 @@ public class DoFnReflectorTest {
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("finishBundle() must be public");
     thrown.expectMessage(getClass().getName() + "$");
-    underTest(new DoFnWithContext<String, String>() {
+    underTest(new DoFn<String, String>() {
       @ProcessElement
       public void processElement() {}
 
@@ -490,7 +490,7 @@ public class DoFnReflectorTest {
   }
 
   @SuppressWarnings({"unused"})
-  private void badExtraContext(DoFnWithContext<Integer, String>.Context c, int n) {}
+  private void badExtraContext(DoFn<Integer, String>.Context c, int n) {}
 
   @Test
   public void testBadExtraContext() throws Exception {
@@ -505,7 +505,7 @@ public class DoFnReflectorTest {
 
   @SuppressWarnings({"unused"})
   private void badExtraProcessContext(
-      DoFnWithContext<Integer, String>.ProcessContext c, Integer n) {}
+      DoFn<Integer, String>.ProcessContext c, Integer n) {}
 
   @Test
   public void testBadExtraProcessContextType() throws Exception {
@@ -534,58 +534,58 @@ public class DoFnReflectorTest {
   }
 
   @SuppressWarnings("unused")
-  private void goodGenerics(DoFnWithContext<Integer, String>.ProcessContext c,
+  private void goodGenerics(DoFn<Integer, String>.ProcessContext c,
       WindowingInternals<Integer, String> i1) {}
 
   @Test
   public void testValidGenerics() throws Exception {
     Method method = getClass().getDeclaredMethod("goodGenerics",
-        DoFnWithContext.ProcessContext.class, WindowingInternals.class);
+        DoFn.ProcessContext.class, WindowingInternals.class);
     DoFnReflector.verifyProcessMethodArguments(method);
   }
 
   @SuppressWarnings("unused")
-  private void goodWildcards(DoFnWithContext<Integer, String>.ProcessContext c,
+  private void goodWildcards(DoFn<Integer, String>.ProcessContext c,
       WindowingInternals<?, ?> i1) {}
 
   @Test
   public void testGoodWildcards() throws Exception {
     Method method = getClass().getDeclaredMethod("goodWildcards",
-        DoFnWithContext.ProcessContext.class, WindowingInternals.class);
+        DoFn.ProcessContext.class, WindowingInternals.class);
     DoFnReflector.verifyProcessMethodArguments(method);
   }
 
   @SuppressWarnings("unused")
-  private void goodBoundedWildcards(DoFnWithContext<Integer, String>.ProcessContext c,
+  private void goodBoundedWildcards(DoFn<Integer, String>.ProcessContext c,
       WindowingInternals<? super Integer, ? super String> i1) {}
 
   @Test
   public void testGoodBoundedWildcards() throws Exception {
     Method method = getClass().getDeclaredMethod("goodBoundedWildcards",
-        DoFnWithContext.ProcessContext.class, WindowingInternals.class);
+        DoFn.ProcessContext.class, WindowingInternals.class);
     DoFnReflector.verifyProcessMethodArguments(method);
   }
 
   @SuppressWarnings("unused")
   private <InputT, OutputT> void goodTypeVariables(
-      DoFnWithContext<InputT, OutputT>.ProcessContext c,
+      DoFn<InputT, OutputT>.ProcessContext c,
       WindowingInternals<InputT, OutputT> i1) {}
 
   @Test
   public void testGoodTypeVariables() throws Exception {
     Method method = getClass().getDeclaredMethod("goodTypeVariables",
-        DoFnWithContext.ProcessContext.class, WindowingInternals.class);
+        DoFn.ProcessContext.class, WindowingInternals.class);
     DoFnReflector.verifyProcessMethodArguments(method);
   }
 
   @SuppressWarnings("unused")
-  private void badGenericTwoArgs(DoFnWithContext<Integer, String>.ProcessContext c,
+  private void badGenericTwoArgs(DoFn<Integer, String>.ProcessContext c,
       WindowingInternals<Integer, Integer> i1) {}
 
   @Test
   public void testBadGenericsTwoArgs() throws Exception {
     Method method = getClass().getDeclaredMethod("badGenericTwoArgs",
-        DoFnWithContext.ProcessContext.class, WindowingInternals.class);
+        DoFn.ProcessContext.class, WindowingInternals.class);
 
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("Incompatible generics in context parameter "
@@ -598,13 +598,13 @@ public class DoFnReflectorTest {
   }
 
   @SuppressWarnings("unused")
-  private void badGenericWildCards(DoFnWithContext<Integer, String>.ProcessContext c,
+  private void badGenericWildCards(DoFn<Integer, String>.ProcessContext c,
       WindowingInternals<Integer, ? super Integer> i1) {}
 
   @Test
   public void testBadGenericWildCards() throws Exception {
     Method method = getClass().getDeclaredMethod("badGenericWildCards",
-        DoFnWithContext.ProcessContext.class, WindowingInternals.class);
+        DoFn.ProcessContext.class, WindowingInternals.class);
 
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("Incompatible generics in context parameter "
@@ -617,13 +617,13 @@ public class DoFnReflectorTest {
   }
 
   @SuppressWarnings("unused")
-  private <InputT, OutputT> void badTypeVariables(DoFnWithContext<InputT, OutputT>.ProcessContext c,
+  private <InputT, OutputT> void badTypeVariables(DoFn<InputT, OutputT>.ProcessContext c,
       WindowingInternals<InputT, InputT> i1) {}
 
   @Test
   public void testBadTypeVariables() throws Exception {
     Method method = getClass().getDeclaredMethod("badTypeVariables",
-        DoFnWithContext.ProcessContext.class, WindowingInternals.class);
+        DoFn.ProcessContext.class, WindowingInternals.class);
 
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("Incompatible generics in context parameter "
@@ -636,7 +636,7 @@ public class DoFnReflectorTest {
 
   @Test
   public void testProcessElementException() throws Exception {
-    DoFnWithContext<Integer, Integer> fn = new DoFnWithContext<Integer, Integer>() {
+    DoFn<Integer, Integer> fn = new DoFn<Integer, Integer>() {
       @ProcessElement
       public void processElement(@SuppressWarnings("unused") ProcessContext c) {
         throw new IllegalArgumentException("bogus");
@@ -650,7 +650,7 @@ public class DoFnReflectorTest {
 
   @Test
   public void testStartBundleException() throws Exception {
-    DoFnWithContext<Integer, Integer> fn = new DoFnWithContext<Integer, Integer>() {
+    DoFn<Integer, Integer> fn = new DoFn<Integer, Integer>() {
       @StartBundle
       public void startBundle(@SuppressWarnings("unused") Context c) {
         throw new IllegalArgumentException("bogus");
@@ -668,7 +668,7 @@ public class DoFnReflectorTest {
 
   @Test
   public void testFinishBundleException() throws Exception {
-    DoFnWithContext<Integer, Integer> fn = new DoFnWithContext<Integer, Integer>() {
+    DoFn<Integer, Integer> fn = new DoFn<Integer, Integer>() {
       @FinishBundle
       public void finishBundle(@SuppressWarnings("unused") Context c) {
         throw new IllegalArgumentException("bogus");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
new file mode 100644
index 0000000..c7e8972
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.isA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+
+/** Tests for {@link DoFn}. */
+@RunWith(JUnit4.class)
+public class DoFnTest implements Serializable {
+  @Rule
+  public transient ExpectedException thrown = ExpectedException.none();
+
+  private class NoOpDoFn extends DoFn<Void, Void> {
+
+    /**
+     * @param c context
+     */
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+    }
+  }
+
+  @Test
+  public void testCreateAggregatorWithCombinerSucceeds() {
+    String name = "testAggregator";
+    Sum.SumLongFn combiner = new Sum.SumLongFn();
+
+    DoFn<Void, Void> doFn = new NoOpDoFn();
+
+    Aggregator<Long, Long> aggregator = doFn.createAggregator(name, combiner);
+
+    assertEquals(name, aggregator.getName());
+    assertEquals(combiner, aggregator.getCombineFn());
+  }
+
+  @Test
+  public void testCreateAggregatorWithNullNameThrowsException() {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("name cannot be null");
+
+    DoFn<Void, Void> doFn = new NoOpDoFn();
+
+    doFn.createAggregator(null, new Sum.SumLongFn());
+  }
+
+  @Test
+  public void testCreateAggregatorWithNullCombineFnThrowsException() {
+    CombineFn<Object, Object, Object> combiner = null;
+
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("combiner cannot be null");
+
+    DoFn<Void, Void> doFn = new NoOpDoFn();
+
+    doFn.createAggregator("testAggregator", combiner);
+  }
+
+  @Test
+  public void testCreateAggregatorWithNullSerializableFnThrowsException() {
+    SerializableFunction<Iterable<Object>, Object> combiner = null;
+
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("combiner cannot be null");
+
+    DoFn<Void, Void> doFn = new NoOpDoFn();
+
+    doFn.createAggregator("testAggregator", combiner);
+  }
+
+  @Test
+  public void testCreateAggregatorWithSameNameThrowsException() {
+    String name = "testAggregator";
+    CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn();
+
+    DoFn<Void, Void> doFn = new NoOpDoFn();
+
+    doFn.createAggregator(name, combiner);
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot create");
+    thrown.expectMessage(name);
+    thrown.expectMessage("already exists");
+
+    doFn.createAggregator(name, combiner);
+  }
+
+  @Test
+  public void testCreateAggregatorsWithDifferentNamesSucceeds() {
+    String nameOne = "testAggregator";
+    String nameTwo = "aggregatorPrime";
+    CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn();
+
+    DoFn<Void, Void> doFn = new NoOpDoFn();
+
+    Aggregator<Double, Double> aggregatorOne =
+        doFn.createAggregator(nameOne, combiner);
+    Aggregator<Double, Double> aggregatorTwo =
+        doFn.createAggregator(nameTwo, combiner);
+
+    assertNotEquals(aggregatorOne, aggregatorTwo);
+  }
+
+  @Test
+  public void testDoFnWithContextUsingAggregators() {
+    NoOpOldDoFn<Object, Object> noOpFn = new NoOpOldDoFn<>();
+    OldDoFn<Object, Object>.Context context = noOpFn.context();
+
+    OldDoFn<Object, Object> fn = spy(noOpFn);
+    context = spy(context);
+
+    @SuppressWarnings("unchecked")
+    Aggregator<Long, Long> agg = mock(Aggregator.class);
+
+    Sum.SumLongFn combiner = new Sum.SumLongFn();
+    Aggregator<Long, Long> delegateAggregator =
+        fn.createAggregator("test", combiner);
+
+    when(context.createAggregatorInternal("test", combiner)).thenReturn(agg);
+
+    context.setupDelegateAggregators();
+    delegateAggregator.addValue(1L);
+
+    verify(agg).addValue(1L);
+  }
+
+  @Test
+  public void testDefaultPopulateDisplayDataImplementation() {
+    DoFn<String, String> fn = new DoFn<String, String>() {
+    };
+    DisplayData displayData = DisplayData.from(fn);
+    assertThat(displayData.items(), empty());
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testCreateAggregatorInStartBundleThrows() {
+    TestPipeline p = createTestPipeline(new DoFn<String, String>() {
+      @StartBundle
+      public void startBundle(Context c) {
+        createAggregator("anyAggregate", new MaxIntegerFn());
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {}
+    });
+
+    thrown.expect(PipelineExecutionException.class);
+    thrown.expectCause(isA(IllegalStateException.class));
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testCreateAggregatorInProcessElementThrows() {
+    TestPipeline p = createTestPipeline(new DoFn<String, String>() {
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        createAggregator("anyAggregate", new MaxIntegerFn());
+      }
+    });
+
+    thrown.expect(PipelineExecutionException.class);
+    thrown.expectCause(isA(IllegalStateException.class));
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testCreateAggregatorInFinishBundleThrows() {
+    TestPipeline p = createTestPipeline(new DoFn<String, String>() {
+      @FinishBundle
+      public void finishBundle(Context c) {
+        createAggregator("anyAggregate", new MaxIntegerFn());
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {}
+    });
+
+    thrown.expect(PipelineExecutionException.class);
+    thrown.expectCause(isA(IllegalStateException.class));
+
+    p.run();
+  }
+
+  /**
+   * Initialize a test pipeline with the specified {@link OldDoFn}.
+   */
+  private <InputT, OutputT> TestPipeline createTestPipeline(DoFn<InputT, OutputT> fn) {
+    TestPipeline pipeline = TestPipeline.create();
+    pipeline.apply(Create.of((InputT) null))
+     .apply(ParDo.of(fn));
+
+    return pipeline;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bcb6f46/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java
deleted file mode 100644
index 0a910b8..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.isA;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-
-/** Tests for {@link DoFnWithContext}. */
-@RunWith(JUnit4.class)
-public class DoFnWithContextTest implements Serializable {
-  @Rule
-  public transient ExpectedException thrown = ExpectedException.none();
-
-  private class NoOpDoFnWithContext extends DoFnWithContext<Void, Void> {
-
-    /**
-     * @param c context
-     */
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-    }
-  }
-
-  @Test
-  public void testCreateAggregatorWithCombinerSucceeds() {
-    String name = "testAggregator";
-    Sum.SumLongFn combiner = new Sum.SumLongFn();
-
-    DoFnWithContext<Void, Void> doFn = new NoOpDoFnWithContext();
-
-    Aggregator<Long, Long> aggregator = doFn.createAggregator(name, combiner);
-
-    assertEquals(name, aggregator.getName());
-    assertEquals(combiner, aggregator.getCombineFn());
-  }
-
-  @Test
-  public void testCreateAggregatorWithNullNameThrowsException() {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("name cannot be null");
-
-    DoFnWithContext<Void, Void> doFn = new NoOpDoFnWithContext();
-
-    doFn.createAggregator(null, new Sum.SumLongFn());
-  }
-
-  @Test
-  public void testCreateAggregatorWithNullCombineFnThrowsException() {
-    CombineFn<Object, Object, Object> combiner = null;
-
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("combiner cannot be null");
-
-    DoFnWithContext<Void, Void> doFn = new NoOpDoFnWithContext();
-
-    doFn.createAggregator("testAggregator", combiner);
-  }
-
-  @Test
-  public void testCreateAggregatorWithNullSerializableFnThrowsException() {
-    SerializableFunction<Iterable<Object>, Object> combiner = null;
-
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("combiner cannot be null");
-
-    DoFnWithContext<Void, Void> doFn = new NoOpDoFnWithContext();
-
-    doFn.createAggregator("testAggregator", combiner);
-  }
-
-  @Test
-  public void testCreateAggregatorWithSameNameThrowsException() {
-    String name = "testAggregator";
-    CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn();
-
-    DoFnWithContext<Void, Void> doFn = new NoOpDoFnWithContext();
-
-    doFn.createAggregator(name, combiner);
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Cannot create");
-    thrown.expectMessage(name);
-    thrown.expectMessage("already exists");
-
-    doFn.createAggregator(name, combiner);
-  }
-
-  @Test
-  public void testCreateAggregatorsWithDifferentNamesSucceeds() {
-    String nameOne = "testAggregator";
-    String nameTwo = "aggregatorPrime";
-    CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn();
-
-    DoFnWithContext<Void, Void> doFn = new NoOpDoFnWithContext();
-
-    Aggregator<Double, Double> aggregatorOne =
-        doFn.createAggregator(nameOne, combiner);
-    Aggregator<Double, Double> aggregatorTwo =
-        doFn.createAggregator(nameTwo, combiner);
-
-    assertNotEquals(aggregatorOne, aggregatorTwo);
-  }
-
-  @Test
-  public void testDoFnWithContextUsingAggregators() {
-    NoOpOldDoFn<Object, Object> noOpFn = new NoOpOldDoFn<>();
-    OldDoFn<Object, Object>.Context context = noOpFn.context();
-
-    OldDoFn<Object, Object> fn = spy(noOpFn);
-    context = spy(context);
-
-    @SuppressWarnings("unchecked")
-    Aggregator<Long, Long> agg = mock(Aggregator.class);
-
-    Sum.SumLongFn combiner = new Sum.SumLongFn();
-    Aggregator<Long, Long> delegateAggregator =
-        fn.createAggregator("test", combiner);
-
-    when(context.createAggregatorInternal("test", combiner)).thenReturn(agg);
-
-    context.setupDelegateAggregators();
-    delegateAggregator.addValue(1L);
-
-    verify(agg).addValue(1L);
-  }
-
-  @Test
-  public void testDefaultPopulateDisplayDataImplementation() {
-    DoFnWithContext<String, String> fn = new DoFnWithContext<String, String>() {
-    };
-    DisplayData displayData = DisplayData.from(fn);
-    assertThat(displayData.items(), empty());
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testCreateAggregatorInStartBundleThrows() {
-    TestPipeline p = createTestPipeline(new DoFnWithContext<String, String>() {
-      @StartBundle
-      public void startBundle(Context c) {
-        createAggregator("anyAggregate", new MaxIntegerFn());
-      }
-
-      @ProcessElement
-      public void processElement(ProcessContext c) {}
-    });
-
-    thrown.expect(PipelineExecutionException.class);
-    thrown.expectCause(isA(IllegalStateException.class));
-
-    p.run();
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testCreateAggregatorInProcessElementThrows() {
-    TestPipeline p = createTestPipeline(new DoFnWithContext<String, String>() {
-      @ProcessElement
-      public void processElement(ProcessContext c) {
-        createAggregator("anyAggregate", new MaxIntegerFn());
-      }
-    });
-
-    thrown.expect(PipelineExecutionException.class);
-    thrown.expectCause(isA(IllegalStateException.class));
-
-    p.run();
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testCreateAggregatorInFinishBundleThrows() {
-    TestPipeline p = createTestPipeline(new DoFnWithContext<String, String>() {
-      @FinishBundle
-      public void finishBundle(Context c) {
-        createAggregator("anyAggregate", new MaxIntegerFn());
-      }
-
-      @ProcessElement
-      public void processElement(ProcessContext c) {}
-    });
-
-    thrown.expect(PipelineExecutionException.class);
-    thrown.expectCause(isA(IllegalStateException.class));
-
-    p.run();
-  }
-
-  /**
-   * Initialize a test pipeline with the specified {@link OldDoFn}.
-   */
-  private <InputT, OutputT> TestPipeline createTestPipeline(DoFnWithContext<InputT, OutputT> fn) {
-    TestPipeline pipeline = TestPipeline.create();
-    pipeline.apply(Create.of((InputT) null))
-     .apply(ParDo.of(fn));
-
-    return pipeline;
-  }
-}