You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/04 01:26:19 UTC

[11/19] incubator-beam git commit: Rename DoFn to OldDoFn

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
index 2696020..ed9ec10 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
@@ -25,8 +25,8 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -58,15 +58,15 @@ import java.util.Set;
  */
 public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
 
-  /** The DoFn being run. */
-  public final DoFn<InputT, OutputT> fn;
+  /** The OldDoFn being run. */
+  public final OldDoFn<InputT, OutputT> fn;
 
-  /** The context used for running the DoFn. */
+  /** The context used for running the OldDoFn. */
   public final DoFnContext<InputT, OutputT> context;
 
   protected DoFnRunnerBase(
       PipelineOptions options,
-      DoFn<InputT, OutputT> fn,
+      OldDoFn<InputT, OutputT> fn,
       SideInputReader sideInputReader,
       OutputManager outputManager,
       TupleTag<OutputT> mainOutputTag,
@@ -145,7 +145,7 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
   }
 
   /**
-   * Invokes {@link DoFn#processElement} after certain pre-processings has been done in
+   * Invokes {@link OldDoFn#processElement} after certain pre-processings has been done in
    * {@link DoFnRunnerBase#processElement}.
    */
   protected abstract void invokeProcessElement(WindowedValue<InputT> elem);
@@ -162,17 +162,17 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
   }
 
   /**
-   * A concrete implementation of {@code DoFn.Context} used for running a {@link DoFn}.
+   * A concrete implementation of {@code OldDoFn.Context} used for running a {@link OldDoFn}.
    *
-   * @param <InputT> the type of the DoFn's (main) input elements
-   * @param <OutputT> the type of the DoFn's (main) output elements
+   * @param <InputT> the type of the OldDoFn's (main) input elements
+   * @param <OutputT> the type of the OldDoFn's (main) output elements
    */
   private static class DoFnContext<InputT, OutputT>
-      extends DoFn<InputT, OutputT>.Context {
+      extends OldDoFn<InputT, OutputT>.Context {
     private static final int MAX_SIDE_OUTPUTS = 1000;
 
     final PipelineOptions options;
-    final DoFn<InputT, OutputT> fn;
+    final OldDoFn<InputT, OutputT> fn;
     final SideInputReader sideInputReader;
     final OutputManager outputManager;
     final TupleTag<OutputT> mainOutputTag;
@@ -187,7 +187,7 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
     private Set<TupleTag<?>> outputTags;
 
     public DoFnContext(PipelineOptions options,
-                       DoFn<InputT, OutputT> fn,
+                       OldDoFn<InputT, OutputT> fn,
                        SideInputReader sideInputReader,
                        OutputManager outputManager,
                        TupleTag<OutputT> mainOutputTag,
@@ -317,8 +317,8 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
     }
 
     // Following implementations of output, outputWithTimestamp, and sideOutput
-    // are only accessible in DoFn.startBundle and DoFn.finishBundle, and will be shadowed by
-    // ProcessContext's versions in DoFn.processElement.
+    // are only accessible in OldDoFn.startBundle and OldDoFn.finishBundle, and will be shadowed by
+    // ProcessContext's versions in OldDoFn.processElement.
     @Override
     public void output(OutputT output) {
       outputWindowedValue(output, null, null, PaneInfo.NO_FIRING);
@@ -350,9 +350,10 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
   }
 
   /**
-   * Returns a new {@code DoFn.ProcessContext} for the given element.
+   * Returns a new {@code OldDoFn.ProcessContext} for the given element.
    */
-  protected DoFn<InputT, OutputT>.ProcessContext createProcessContext(WindowedValue<InputT> elem) {
+  protected OldDoFn<InputT, OutputT>.ProcessContext createProcessContext(
+      WindowedValue<InputT> elem) {
     return new DoFnProcessContext<InputT, OutputT>(fn, context, elem);
   }
 
@@ -365,21 +366,21 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
   }
 
   /**
-   * A concrete implementation of {@code DoFn.ProcessContext} used for
-   * running a {@link DoFn} over a single element.
+   * A concrete implementation of {@code OldDoFn.ProcessContext} used for
+   * running a {@link OldDoFn} over a single element.
    *
-   * @param <InputT> the type of the DoFn's (main) input elements
-   * @param <OutputT> the type of the DoFn's (main) output elements
+   * @param <InputT> the type of the OldDoFn's (main) input elements
+   * @param <OutputT> the type of the OldDoFn's (main) output elements
    */
   static class DoFnProcessContext<InputT, OutputT>
-      extends DoFn<InputT, OutputT>.ProcessContext {
+      extends OldDoFn<InputT, OutputT>.ProcessContext {
 
 
-    final DoFn<InputT, OutputT> fn;
+    final OldDoFn<InputT, OutputT> fn;
     final DoFnContext<InputT, OutputT> context;
     final WindowedValue<InputT> windowedValue;
 
-    public DoFnProcessContext(DoFn<InputT, OutputT> fn,
+    public DoFnProcessContext(OldDoFn<InputT, OutputT> fn,
                               DoFnContext<InputT, OutputT> context,
                               WindowedValue<InputT> windowedValue) {
       fn.super();
@@ -426,7 +427,8 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
     public BoundedWindow window() {
       if (!(fn instanceof RequiresWindowAccess)) {
         throw new UnsupportedOperationException(
-            "window() is only available in the context of a DoFn marked as RequiresWindowAccess.");
+            "window() is only available in the context of a OldDoFn marked as"
+                + "RequiresWindowAccess.");
       }
       return Iterables.getOnlyElement(windows());
     }
@@ -484,7 +486,7 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu
         throw new IllegalArgumentException(String.format(
             "Cannot output with timestamp %s. Output timestamps must be no earlier than the "
             + "timestamp of the current input (%s) minus the allowed skew (%s). See the "
-            + "DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.",
+            + "OldDoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.",
             timestamp, windowedValue.getTimestamp(),
             PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod())));
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
index cb96da2..a9f3cf4 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
@@ -19,7 +19,7 @@ package org.apache.beam.sdk.util;
 
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.DoFnRunner.ReduceFnExecutor;
 import org.apache.beam.sdk.util.ExecutionContext.StepContext;
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.util.common.CounterSet;
 import org.apache.beam.sdk.util.common.CounterSet.AddCounterMutator;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
+
 import java.util.List;
 
 /**
@@ -44,13 +45,13 @@ public class DoFnRunners {
   }
 
   /**
-   * Returns a basic implementation of {@link DoFnRunner} that works for most {@link DoFn DoFns}.
+   * Returns a basic implementation of {@link DoFnRunner} that works for most {@link OldDoFn DoFns}.
    *
-   * <p>It invokes {@link DoFn#processElement} for each input.
+   * <p>It invokes {@link OldDoFn#processElement} for each input.
    */
   public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
       PipelineOptions options,
-      DoFn<InputT, OutputT> fn,
+      OldDoFn<InputT, OutputT> fn,
       SideInputReader sideInputReader,
       OutputManager outputManager,
       TupleTag<OutputT> mainOutputTag,
@@ -71,13 +72,14 @@ public class DoFnRunners {
   }
 
   /**
-   * Returns a basic implementation of {@link DoFnRunner} that works for most {@link DoFn DoFns}.
+   * Returns a basic implementation of {@link DoFnRunner} that works for most
+   * {@link OldDoFn OldDoFns}.
    *
-   * <p>It invokes {@link DoFn#processElement} for each input.
+   * <p>It invokes {@link OldDoFn#processElement} for each input.
    */
   public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
       PipelineOptions options,
-      DoFn<InputT, OutputT> fn,
+      OldDoFn<InputT, OutputT> fn,
       SideInputReader sideInputReader,
       OutputManager outputManager,
       TupleTag<OutputT> mainOutputTag,
@@ -99,7 +101,7 @@ public class DoFnRunners {
   /**
    * Returns an implementation of {@link DoFnRunner} that handles late data dropping.
    *
-   * <p>It drops elements from expired windows before they reach the underlying {@link DoFn}.
+   * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}.
    */
   public static <K, InputT, OutputT, W extends BoundedWindow>
       DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(
@@ -133,7 +135,7 @@ public class DoFnRunners {
   /**
    * Returns an implementation of {@link DoFnRunner} that handles late data dropping.
    *
-   * <p>It drops elements from expired windows before they reach the underlying {@link DoFn}.
+   * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}.
    */
   public static <K, InputT, OutputT, W extends BoundedWindow>
   DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(
@@ -160,7 +162,7 @@ public class DoFnRunners {
 
   public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
       PipelineOptions options,
-      DoFn<InputT, OutputT> doFn,
+      OldDoFn<InputT, OutputT> doFn,
       SideInputReader sideInputReader,
       OutputManager outputManager,
       TupleTag<OutputT> mainOutputTag,
@@ -198,7 +200,7 @@ public class DoFnRunners {
 
   public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
       PipelineOptions options,
-      DoFn<InputT, OutputT> doFn,
+      OldDoFn<InputT, OutputT> doFn,
       SideInputReader sideInputReader,
       OutputManager outputManager,
       TupleTag<OutputT> mainOutputTag,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
index b575559..f82e5df 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
@@ -19,14 +19,14 @@ package org.apache.beam.sdk.util;
 
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.KV;
 
 /**
- * DoFn that merges windows and groups elements in those windows, optionally
+ * OldDoFn that merges windows and groups elements in those windows, optionally
  * combining values.
  *
  * @param <K> key type
@@ -36,7 +36,7 @@ import org.apache.beam.sdk.values.KV;
  */
 @SystemDoFnInternal
 public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends BoundedWindow>
-    extends DoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
+    extends OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
   public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow";
   public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java
index d185a24..f872ffc 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.util;
 
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
@@ -52,7 +52,7 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends
 
   @Override
   public void processElement(
-      DoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>>.ProcessContext c)
+      OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>>.ProcessContext c)
           throws Exception {
     K key = c.element().getKey();
     // Used with Batch, we know that all the data is available for this key. We can't use the

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
index 8a0152e..f0f9007 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
@@ -22,8 +22,8 @@ import static com.google.common.base.Preconditions.checkArgument;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -138,7 +138,9 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
       return input
           .apply(
               ParDo.of(
-                  new DoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<WindowedValue<V>>>>() {
+                  new OldDoFn<
+                      KV<K, Iterable<WindowedValue<V>>>,
+                      KV<K, Iterable<WindowedValue<V>>>>() {
                     @Override
                     public void processElement(ProcessContext c) {
                       KV<K, Iterable<WindowedValue<V>>> kvs = c.element();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java
index 4815162..8b3ba24 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java
@@ -18,7 +18,7 @@
 package org.apache.beam.sdk.util;
 
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
 
@@ -31,7 +31,7 @@ import org.joda.time.Instant;
 
 /**
  * A customized {@link DoFnRunner} that handles late data dropping for
- * a {@link KeyedWorkItem} input {@link DoFn}.
+ * a {@link KeyedWorkItem} input {@link OldDoFn}.
  *
  * <p>It expands windows before checking data lateness.
  *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java
index 812e99a..0c5849e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.util.state.ValueState;
 import com.google.common.annotations.VisibleForTesting;
 
 import org.joda.time.Instant;
-
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
index c879409..1fa0830 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
@@ -22,7 +22,7 @@ import static com.google.common.base.Preconditions.checkState;
 
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -181,7 +181,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
    * Store the previously emitted pane (if any) for each window.
    *
    * <ul>
-   * <li>State: The previous {@link PaneInfo} passed to the user's {@link DoFn#processElement},
+   * <li>State: The previous {@link PaneInfo} passed to the user's {@link OldDoFn#processElement},
    * if any.
    * <li>Style style: DIRECT
    * <li>Merging: Always keyed by actual window, so does not depend on {@link #activeWindows}.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
index e034638..a0cdb40 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
@@ -19,21 +19,21 @@ package org.apache.beam.sdk.util;
 
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
 import org.apache.beam.sdk.util.ExecutionContext.StepContext;
 import org.apache.beam.sdk.values.TupleTag;
 import java.util.List;
 
 /**
- * Runs a {@link DoFn} by constructing the appropriate contexts and passing them in.
+ * Runs a {@link OldDoFn} by constructing the appropriate contexts and passing them in.
  *
- * @param <InputT> the type of the DoFn's (main) input elements
- * @param <OutputT> the type of the DoFn's (main) output elements
+ * @param <InputT> the type of the OldDoFn's (main) input elements
+ * @param <OutputT> the type of the OldDoFn's (main) output elements
  */
 public class SimpleDoFnRunner<InputT, OutputT> extends DoFnRunnerBase<InputT, OutputT>{
 
-  protected SimpleDoFnRunner(PipelineOptions options, DoFn<InputT, OutputT> fn,
+  protected SimpleDoFnRunner(PipelineOptions options, OldDoFn<InputT, OutputT> fn,
       SideInputReader sideInputReader,
       OutputManager outputManager,
       TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, StepContext stepContext,
@@ -44,7 +44,7 @@ public class SimpleDoFnRunner<InputT, OutputT> extends DoFnRunnerBase<InputT, Ou
 
   @Override
   protected void invokeProcessElement(WindowedValue<InputT> elem) {
-    final DoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem);
+    final OldDoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem);
     // This can contain user code. Wrap it in case it throws an exception.
     try {
       fn.processElement(processContext);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
index 985f210..5c17009 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
@@ -37,7 +37,6 @@ import org.joda.time.Duration;
 import org.joda.time.Instant;
 
 import java.io.Serializable;
-
 import javax.annotation.Nullable;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
index dc2413a..8d604cb 100644
--- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
@@ -21,6 +21,7 @@ import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue;
 import static org.apache.beam.sdk.WindowMatchers.isWindowedValue;
 
 import static com.google.common.base.Preconditions.checkArgument;
+
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.emptyIterable;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
index e0ff879..feba191 100644
--- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.util;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/core-java/src/test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java
index fb74fc6..f0c52b9 100644
--- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/SimpleDoFnRunnerTest.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.util;
 import static org.hamcrest.Matchers.is;
 import static org.mockito.Mockito.mock;
 
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.BaseExecutionContext.StepContext;
 import org.apache.beam.sdk.values.TupleTag;
 
@@ -62,7 +62,7 @@ public class SimpleDoFnRunnerTest {
     runner.processElement(WindowedValue.valueInGlobalWindow("anyValue"));
   }
 
-  private DoFnRunner<String, String> createRunner(DoFn<String, String> fn) {
+  private DoFnRunner<String, String> createRunner(OldDoFn<String, String> fn) {
     // Pass in only necessary parameters for the test
     List<TupleTag<?>> sideOutputTags = Arrays.asList();
     StepContext context = mock(StepContext.class);
@@ -70,7 +70,7 @@ public class SimpleDoFnRunnerTest {
           null, fn, null, null, null, sideOutputTags, context, null, null);
   }
 
-  static class ThrowingDoFn extends DoFn<String, String> {
+  static class ThrowingDoFn extends OldDoFn<String, String> {
     final Exception exceptionToThrow =
         new UnsupportedOperationException("Expected exception");
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 477da30..e052226 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -23,7 +23,7 @@ import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
@@ -106,7 +106,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
 
       StateInternals<K> stateInternals = (StateInternals<K>) stepContext.stateInternals();
 
-      DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> gabwDoFn =
+      OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> gabwDoFn =
           GroupAlsoByWindowViaWindowSetDoFn.create(
               windowingStrategy,
               new ConstantStateInternalsFactory<K>(stateInternals),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
index dcbe3d1..8be12fd 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
@@ -23,7 +23,7 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.IllegalMutationException;
 import org.apache.beam.sdk.util.MutationDetector;
 import org.apache.beam.sdk.util.MutationDetectors;
@@ -42,7 +42,7 @@ import org.joda.time.Instant;
  * elements added to the bundle will be encoded by the {@link Coder} of the underlying
  * {@link PCollection}.
  *
- * <p>This catches errors during the execution of a {@link DoFn} caused by modifying an element
+ * <p>This catches errors during the execution of a {@link OldDoFn} caused by modifying an element
  * after it is added to an output {@link PCollection}.
  */
 class ImmutabilityCheckingBundleFactory implements BundleFactory {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index dd1cf37..6ef0ffe 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -21,7 +21,7 @@ import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.DoFnRunner;
 import org.apache.beam.sdk.util.DoFnRunners;
 import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
@@ -48,7 +48,7 @@ class ParDoEvaluator<T> implements TransformEvaluator<T> {
       DirectStepContext stepContext,
       CommittedBundle<InputT> inputBundle,
       AppliedPTransform<PCollection<InputT>, ?, ?> application,
-      DoFn<InputT, OutputT> fn,
+      OldDoFn<InputT, OutputT> fn,
       List<PCollectionView<?>> sideInputs,
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
index eda3db4..ce770ca 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.direct;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
 import org.apache.beam.sdk.values.PCollection;
@@ -38,7 +38,7 @@ import java.util.Map;
  * {@link BoundMulti} primitive {@link PTransform}.
  */
 class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
-  private final LoadingCache<AppliedPTransform<?, ?, BoundMulti<?, ?>>, ThreadLocal<DoFn<?, ?>>>
+  private final LoadingCache<AppliedPTransform<?, ?, BoundMulti<?, ?>>, ThreadLocal<OldDoFn<?, ?>>>
       fnClones;
 
   public ParDoMultiEvaluatorFactory() {
@@ -46,9 +46,10 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
         CacheBuilder.newBuilder()
             .build(
                 new CacheLoader<
-                    AppliedPTransform<?, ?, BoundMulti<?, ?>>, ThreadLocal<DoFn<?, ?>>>() {
+                    AppliedPTransform<?, ?, BoundMulti<?, ?>>, ThreadLocal<OldDoFn<?, ?>>>() {
                   @Override
-                  public ThreadLocal<DoFn<?, ?>> load(AppliedPTransform<?, ?, BoundMulti<?, ?>> key)
+                  public ThreadLocal<OldDoFn<?, ?>> load(
+                      AppliedPTransform<?, ?, BoundMulti<?, ?>> key)
                       throws Exception {
                     @SuppressWarnings({"unchecked", "rawtypes"})
                     ThreadLocal threadLocal =
@@ -76,7 +77,7 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
     Map<TupleTag<?>, PCollection<?>> outputs = application.getOutput().getAll();
 
     @SuppressWarnings({"unchecked", "rawtypes"})
-    ThreadLocal<DoFn<InT, OuT>> fnLocal =
+    ThreadLocal<OldDoFn<InT, OuT>> fnLocal =
         (ThreadLocal) fnClones.getUnchecked((AppliedPTransform) application);
     String stepName = evaluationContext.getStepName(application);
     DirectStepContext stepContext =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
index 044abdc..53af6af 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.direct;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo.Bound;
 import org.apache.beam.sdk.values.PCollection;
@@ -38,16 +38,17 @@ import java.util.Collections;
  * {@link Bound ParDo.Bound} primitive {@link PTransform}.
  */
 class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
-  private final LoadingCache<AppliedPTransform<?, ?, Bound<?, ?>>, ThreadLocal<DoFn<?, ?>>>
+  private final LoadingCache<AppliedPTransform<?, ?, Bound<?, ?>>, ThreadLocal<OldDoFn<?, ?>>>
       fnClones;
 
   public ParDoSingleEvaluatorFactory() {
     fnClones =
         CacheBuilder.newBuilder()
             .build(
-                new CacheLoader<AppliedPTransform<?, ?, Bound<?, ?>>, ThreadLocal<DoFn<?, ?>>>() {
+                new CacheLoader<
+                    AppliedPTransform<?, ?, Bound<?, ?>>, ThreadLocal<OldDoFn<?, ?>>>() {
                   @Override
-                  public ThreadLocal<DoFn<?, ?>> load(AppliedPTransform<?, ?, Bound<?, ?>> key)
+                  public ThreadLocal<OldDoFn<?, ?>> load(AppliedPTransform<?, ?, Bound<?, ?>> key)
                       throws Exception {
                     @SuppressWarnings({"unchecked", "rawtypes"})
                     ThreadLocal threadLocal =
@@ -80,7 +81,7 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
             .getOrCreateStepContext(stepName, stepName);
 
     @SuppressWarnings({"unchecked", "rawtypes"})
-    ThreadLocal<DoFn<InputT, OutputT>> fnLocal =
+    ThreadLocal<OldDoFn<InputT, OutputT>> fnLocal =
         (ThreadLocal) fnClones.getUnchecked((AppliedPTransform) application);
     try {
       ParDoEvaluator<InputT> parDoEvaluator =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
index 7fac1e3..d021b43 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
@@ -21,7 +21,7 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 
 import javax.annotation.Nullable;
@@ -38,8 +38,8 @@ public interface TransformEvaluatorFactory {
    * Create a new {@link TransformEvaluator} for the application of the {@link PTransform}.
    *
    * <p>Any work that must be done before input elements are processed (such as calling
-   * {@link DoFn#startBundle(DoFn.Context)}) must be done before the {@link TransformEvaluator} is
-   * made available to the caller.
+   * {@link OldDoFn#startBundle(OldDoFn.Context)}) must be done before the
+   * {@link TransformEvaluator} is made available to the caller.
    *
    * <p>May return null if the application cannot produce an evaluator (for example, it is a
    * {@link Read} {@link PTransform} where all evaluators are in-use).

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index d6ee6ea..cee4001 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -23,9 +23,9 @@ import static com.google.common.base.Preconditions.checkArgument;
 import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.io.Write.Bound;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Values;
@@ -101,7 +101,7 @@ class WriteWithShardingFactory implements PTransformOverrideFactory {
   }
 
   @VisibleForTesting
-  static class KeyBasedOnCountFn<T> extends DoFn<T, KV<Integer, T>> {
+  static class KeyBasedOnCountFn<T> extends OldDoFn<T, KV<Integer, T>> {
     @VisibleForTesting
     static final int MIN_SHARDS_FOR_LOG = 3;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
index 353eef6..529316c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitorTest.java
@@ -24,8 +24,8 @@ import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
@@ -62,9 +62,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
         p.apply("listCreate", Create.of("foo", "bar"))
             .apply(
                 ParDo.of(
-                    new DoFn<String, String>() {
+                    new OldDoFn<String, String>() {
                       @Override
-                      public void processElement(DoFn<String, String>.ProcessContext c)
+                      public void processElement(OldDoFn<String, String>.ProcessContext c)
                           throws Exception {
                         c.output(Integer.toString(c.element().length()));
                       }
@@ -109,9 +109,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
     PCollection<String> transformed =
         created.apply(
             ParDo.of(
-                new DoFn<String, String>() {
+                new OldDoFn<String, String>() {
                   @Override
-                  public void processElement(DoFn<String, String>.ProcessContext c)
+                  public void processElement(OldDoFn<String, String>.ProcessContext c)
                       throws Exception {
                     c.output(Integer.toString(c.element().length()));
                   }
@@ -140,9 +140,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
     PCollection<String> transformed =
         created.apply(
             ParDo.of(
-                new DoFn<String, String>() {
+                new OldDoFn<String, String>() {
                   @Override
-                  public void processElement(DoFn<String, String>.ProcessContext c)
+                  public void processElement(OldDoFn<String, String>.ProcessContext c)
                       throws Exception {
                     c.output(Integer.toString(c.element().length()));
                   }
@@ -157,9 +157,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
     p.apply(Create.of("1", "2", "3"))
         .apply(
             ParDo.of(
-                new DoFn<String, String>() {
+                new OldDoFn<String, String>() {
                   @Override
-                  public void processElement(DoFn<String, String>.ProcessContext c)
+                  public void processElement(OldDoFn<String, String>.ProcessContext c)
                       throws Exception {
                     c.output(Integer.toString(c.element().length()));
                   }
@@ -182,9 +182,9 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
     PCollection<String> transformed =
         created.apply(
             ParDo.of(
-                new DoFn<String, String>() {
+                new OldDoFn<String, String>() {
                   @Override
-                  public void processElement(DoFn<String, String>.ProcessContext c)
+                  public void processElement(OldDoFn<String, String>.ProcessContext c)
                       throws Exception {
                     c.output(Integer.toString(c.element().length()));
                   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 09707bd..29dea32 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -32,9 +32,9 @@ import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -159,7 +159,7 @@ public class DirectRunnerTest implements Serializable {
 
   @Test
   public void transformDisplayDataExceptionShouldFail() {
-    DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() {
+    OldDoFn<Integer, Integer> brokenDoFn = new OldDoFn<Integer, Integer>() {
       @Override
       public void processElement(ProcessContext c) throws Exception {}
 
@@ -211,7 +211,7 @@ public class DirectRunnerTest implements Serializable {
 
 
   /**
-   * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
+   * Tests that a {@link OldDoFn} that mutates an output with a good equals() fails in the
    * {@link DirectRunner}.
    */
   @Test
@@ -220,7 +220,7 @@ public class DirectRunnerTest implements Serializable {
 
     pipeline
         .apply(Create.of(42))
-        .apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
+        .apply(ParDo.of(new OldDoFn<Integer, List<Integer>>() {
           @Override public void processElement(ProcessContext c) {
             List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
             c.output(outputList);
@@ -236,7 +236,7 @@ public class DirectRunnerTest implements Serializable {
   }
 
   /**
-   * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
+   * Tests that a {@link OldDoFn} that mutates an output with a good equals() fails in the
    * {@link DirectRunner}.
    */
   @Test
@@ -245,7 +245,7 @@ public class DirectRunnerTest implements Serializable {
 
     pipeline
         .apply(Create.of(42))
-        .apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
+        .apply(ParDo.of(new OldDoFn<Integer, List<Integer>>() {
           @Override public void processElement(ProcessContext c) {
             List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
             c.output(outputList);
@@ -260,7 +260,7 @@ public class DirectRunnerTest implements Serializable {
   }
 
   /**
-   * Tests that a {@link DoFn} that mutates an output with a bad equals() still fails
+   * Tests that a {@link OldDoFn} that mutates an output with a bad equals() still fails
    * in the {@link DirectRunner}.
    */
   @Test
@@ -269,7 +269,7 @@ public class DirectRunnerTest implements Serializable {
 
     pipeline
         .apply(Create.of(42))
-        .apply(ParDo.of(new DoFn<Integer, byte[]>() {
+        .apply(ParDo.of(new OldDoFn<Integer, byte[]>() {
           @Override public void processElement(ProcessContext c) {
             byte[] outputArray = new byte[]{0x1, 0x2, 0x3};
             c.output(outputArray);
@@ -285,7 +285,7 @@ public class DirectRunnerTest implements Serializable {
   }
 
   /**
-   * Tests that a {@link DoFn} that mutates its input with a good equals() fails in the
+   * Tests that a {@link OldDoFn} that mutates its input with a good equals() fails in the
    * {@link DirectRunner}.
    */
   @Test
@@ -295,7 +295,7 @@ public class DirectRunnerTest implements Serializable {
     pipeline
         .apply(Create.of(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6))
             .withCoder(ListCoder.of(VarIntCoder.of())))
-        .apply(ParDo.of(new DoFn<List<Integer>, Integer>() {
+        .apply(ParDo.of(new OldDoFn<List<Integer>, Integer>() {
           @Override public void processElement(ProcessContext c) {
             List<Integer> inputList = c.element();
             inputList.set(0, 37);
@@ -310,7 +310,7 @@ public class DirectRunnerTest implements Serializable {
   }
 
   /**
-   * Tests that a {@link DoFn} that mutates an input with a bad equals() still fails
+   * Tests that a {@link OldDoFn} that mutates an input with a bad equals() still fails
    * in the {@link DirectRunner}.
    */
   @Test
@@ -319,7 +319,7 @@ public class DirectRunnerTest implements Serializable {
 
     pipeline
         .apply(Create.of(new byte[]{0x1, 0x2, 0x3}, new byte[]{0x4, 0x5, 0x6}))
-        .apply(ParDo.of(new DoFn<byte[], Integer>() {
+        .apply(ParDo.of(new OldDoFn<byte[], Integer>() {
           @Override public void processElement(ProcessContext c) {
             byte[] inputArray = c.element();
             inputArray[0] = 0xa;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
index d40cf93..db934e5 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -213,9 +213,9 @@ public class ImmutabilityCheckingBundleFactoryTest {
     CommittedBundle<byte[]> committed = intermediate.commit(Instant.now());
   }
 
-  private static class IdentityDoFn<T> extends DoFn<T, T> {
+  private static class IdentityDoFn<T> extends OldDoFn<T, T> {
     @Override
-    public void processElement(DoFn<T, T>.ProcessContext c) throws Exception {
+    public void processElement(OldDoFn<T, T>.ProcessContext c) throws Exception {
       c.output(c.element());
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
index 890e06d..e1be120 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
@@ -22,7 +22,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.IllegalMutationException;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -59,9 +59,9 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable {
         p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes()))
             .apply(
                 ParDo.of(
-                    new DoFn<byte[], byte[]>() {
+                    new OldDoFn<byte[], byte[]>() {
                       @Override
-                      public void processElement(DoFn<byte[], byte[]>.ProcessContext c)
+                      public void processElement(OldDoFn<byte[], byte[]>.ProcessContext c)
                           throws Exception {
                         c.element()[0] = 'b';
                       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
index aa0d976..9e273ad 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
@@ -28,9 +28,9 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.Keys;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
@@ -180,9 +180,9 @@ public class KeyedPValueTrackingVisitorTest {
     }
   }
 
-  private static class IdentityFn<K> extends DoFn<K, K> {
+  private static class IdentityFn<K> extends OldDoFn<K, K> {
     @Override
-    public void processElement(DoFn<K, K>.ProcessContext c) throws Exception {
+    public void processElement(OldDoFn<K, K>.ProcessContext c) throws Exception {
       c.output(c.element());
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 07f478d..3208841 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -30,7 +30,7 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -169,7 +169,7 @@ public class ParDoEvaluatorTest {
         ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, output));
   }
 
-  private static class RecorderFn extends DoFn<Integer, Integer> {
+  private static class RecorderFn extends OldDoFn<Integer, Integer> {
     private Collection<Integer> processed;
     private final PCollectionView<Integer> view;
 
@@ -179,7 +179,7 @@ public class ParDoEvaluatorTest {
     }
 
     @Override
-    public void processElement(DoFn<Integer, Integer>.ProcessContext c) throws Exception {
+    public void processElement(OldDoFn<Integer, Integer>.ProcessContext c) throws Exception {
       processed.add(c.element());
       c.output(c.element() + c.sideInput(view));
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
index c0ab4df..19094cb 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
@@ -31,7 +31,7 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -80,7 +80,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
 
     BoundMulti<String, KV<String, Integer>> pardo =
         ParDo.of(
-                new DoFn<String, KV<String, Integer>>() {
+                new OldDoFn<String, KV<String, Integer>>() {
                   @Override
                   public void processElement(ProcessContext c) {
                     c.output(KV.<String, Integer>of(c.element(), c.element().length()));
@@ -170,7 +170,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
 
     BoundMulti<String, KV<String, Integer>> pardo =
         ParDo.of(
-                new DoFn<String, KV<String, Integer>>() {
+                new OldDoFn<String, KV<String, Integer>>() {
                   @Override
                   public void processElement(ProcessContext c) {
                     c.output(KV.<String, Integer>of(c.element(), c.element().length()));
@@ -254,7 +254,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
         StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
     BoundMulti<String, KV<String, Integer>> pardo =
         ParDo.of(
-                new DoFn<String, KV<String, Integer>>() {
+                new OldDoFn<String, KV<String, Integer>>() {
                   @Override
                   public void processElement(ProcessContext c) {
                     c.windowingInternals()
@@ -354,7 +354,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
 
     BoundMulti<String, KV<String, Integer>> pardo =
         ParDo.of(
-                new DoFn<String, KV<String, Integer>>() {
+                new OldDoFn<String, KV<String, Integer>>() {
                   @Override
                   public void processElement(ProcessContext c) {
                     c.windowingInternals().stateInternals();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
index d778da6..a4fd570 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
@@ -31,7 +31,7 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -73,7 +73,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     PCollection<Integer> collection =
         input.apply(
             ParDo.of(
-                new DoFn<String, Integer>() {
+                new OldDoFn<String, Integer>() {
                   @Override
                   public void processElement(ProcessContext c) {
                     c.output(c.element().length());
@@ -127,7 +127,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     PCollection<Integer> collection =
         input.apply(
             ParDo.of(
-                new DoFn<String, Integer>() {
+                new OldDoFn<String, Integer>() {
                   @Override
                   public void processElement(ProcessContext c) {
                     c.sideOutput(sideOutputTag, c.element().length());
@@ -179,7 +179,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
         StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
     ParDo.Bound<String, KV<String, Integer>> pardo =
         ParDo.of(
-            new DoFn<String, KV<String, Integer>>() {
+            new OldDoFn<String, KV<String, Integer>>() {
               @Override
               public void processElement(ProcessContext c) {
                 c.windowingInternals()
@@ -265,7 +265,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
 
     ParDo.Bound<String, KV<String, Integer>> pardo =
         ParDo.of(
-            new DoFn<String, KV<String, Integer>>() {
+            new OldDoFn<String, KV<String, Integer>>() {
               @Override
               public void processElement(ProcessContext c) {
                 c.windowingInternals().stateInternals();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index 7c7005c..22f148a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -38,9 +38,9 @@ import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -76,7 +76,6 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
-
 import javax.annotation.Nullable;
 
 /**
@@ -105,9 +104,9 @@ public class WatermarkManagerTest implements Serializable {
     createdInts = p.apply("createdInts", Create.of(1, 2, 3));
 
     filtered = createdInts.apply("filtered", Filter.greaterThan(1));
-    filteredTimesTwo = filtered.apply("timesTwo", ParDo.of(new DoFn<Integer, Integer>() {
+    filteredTimesTwo = filtered.apply("timesTwo", ParDo.of(new OldDoFn<Integer, Integer>() {
       @Override
-      public void processElement(DoFn<Integer, Integer>.ProcessContext c) throws Exception {
+      public void processElement(OldDoFn<Integer, Integer>.ProcessContext c) throws Exception {
         c.output(c.element() * 2);
       }
     }));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index 56737a4..716c8ad 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.Keys;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -230,7 +230,7 @@ public class TFIDF {
       // Create a collection of pairs mapping a URI to each
       // of the words in the document associated with that that URI.
       PCollection<KV<URI, String>> uriToWords = uriToContent
-          .apply("SplitWords", ParDo.of(new DoFn<KV<URI, String>, KV<URI, String>>() {
+          .apply("SplitWords", ParDo.of(new OldDoFn<KV<URI, String>, KV<URI, String>>() {
             private static final long serialVersionUID = 0;
 
             @Override
@@ -275,7 +275,7 @@ public class TFIDF {
       // by the URI key.
       PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
           .apply("ShiftKeys", ParDo.of(
-              new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
+              new OldDoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
                 private static final long serialVersionUID = 0;
 
                 @Override
@@ -316,7 +316,7 @@ public class TFIDF {
       // divided by the total number of words in the document.
       PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
           .apply("ComputeTermFrequencies", ParDo.of(
-              new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+              new OldDoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
                 private static final long serialVersionUID = 0;
 
                 @Override
@@ -339,11 +339,11 @@ public class TFIDF {
       // documents in which the word appears divided by the total
       // number of documents in the corpus. Note how the total number of
       // documents is passed as a side input; the same value is
-      // presented to each invocation of the DoFn.
+      // presented to each invocation of the OldDoFn.
       PCollection<KV<String, Double>> wordToDf = wordToDocCount
           .apply("ComputeDocFrequencies", ParDo
               .withSideInputs(totalDocuments)
-              .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
+              .of(new OldDoFn<KV<String, Long>, KV<String, Double>>() {
                 private static final long serialVersionUID = 0;
 
                 @Override
@@ -375,7 +375,7 @@ public class TFIDF {
 
       return wordToUriAndTfAndDf
           .apply("ComputeTfIdf", ParDo.of(
-              new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+              new OldDoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
                 private static final long serialVersionUID = 0;
 
                 @Override
@@ -416,7 +416,7 @@ public class TFIDF {
     @Override
     public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
       return wordToUriAndTfIdf
-          .apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() {
+          .apply("Format", ParDo.of(new OldDoFn<KV<String, KV<URI, Double>>, String>() {
             private static final long serialVersionUID = 0;
 
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
index c54229d..080cdc9 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -38,7 +38,7 @@ import org.apache.beam.sdk.values.PCollection;
 
 public class WordCount {
 
-  public static class ExtractWordsFn extends DoFn<String, String> {
+  public static class ExtractWordsFn extends OldDoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
         createAggregator("emptyLines", new Sum.SumLongFn());
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index c0ff85d..068404a 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -92,7 +92,7 @@ public class AutoComplete {
 
         // Map the KV outputs of Count into our own CompletionCandiate class.
         .apply("CreateCompletionCandidates", ParDo.of(
-            new DoFn<KV<String, Long>, CompletionCandidate>() {
+            new OldDoFn<KV<String, Long>, CompletionCandidate>() {
               private static final long serialVersionUID = 0;
 
               @Override
@@ -182,7 +182,7 @@ public class AutoComplete {
     }
 
     private static class FlattenTops
-        extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
+        extends OldDoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
       private static final long serialVersionUID = 0;
 
       @Override
@@ -236,10 +236,10 @@ public class AutoComplete {
   }
 
   /**
-   * A DoFn that keys each candidate by all its prefixes.
+   * A OldDoFn that keys each candidate by all its prefixes.
    */
   private static class AllPrefixes
-      extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
+      extends OldDoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
     private static final long serialVersionUID = 0;
 
     private final int minPrefix;
@@ -314,7 +314,7 @@ public class AutoComplete {
     }
   }
 
-  static class ExtractWordsFn extends DoFn<String, String> {
+  static class ExtractWordsFn extends OldDoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
             createAggregator("emptyLines", new Sum.SumLongFn());
 
@@ -340,8 +340,8 @@ public class AutoComplete {
    * Takes as input a the top candidates per prefix, and emits an entity
    * suitable for writing to Datastore.
    */
-  static class FormatForPerTaskLocalFile extends DoFn<KV<String, List<CompletionCandidate>>, String>
-          implements DoFn.RequiresWindowAccess{
+  static class FormatForPerTaskLocalFile extends OldDoFn<KV<String, List<CompletionCandidate>>, String>
+          implements OldDoFn.RequiresWindowAccess{
 
     private static final long serialVersionUID = 0;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
index f456b27..7d7c0c7 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
@@ -23,7 +23,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.join.CoGbkResult;
 import org.apache.beam.sdk.transforms.join.CoGroupByKey;
@@ -76,7 +76,7 @@ public class JoinExamples {
     // country code 'key' -> string of <event info>, <country name>
     PCollection<KV<String, String>> finalResultCollection =
         kvpCollection.apply("Process", ParDo.of(
-            new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
+            new OldDoFn<KV<String, CoGbkResult>, KV<String, String>>() {
               private static final long serialVersionUID = 0;
 
               @Override
@@ -98,7 +98,7 @@ public class JoinExamples {
             }));
 
     return finalResultCollection
-        .apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() {
+        .apply("Format", ParDo.of(new OldDoFn<KV<String, String>, String>() {
           private static final long serialVersionUID = 0;
 
           @Override
@@ -110,7 +110,7 @@ public class JoinExamples {
         }));
   }
 
-  static class ExtractEventDataFn extends DoFn<String, KV<String, String>> {
+  static class ExtractEventDataFn extends OldDoFn<String, KV<String, String>> {
     private static final long serialVersionUID = 0;
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
index 8756abe..395b409 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -326,7 +326,7 @@ public class KafkaIOExamples {
    * Print contents to stdout
    * @param <T> type of the input
    */
-  private static class PrintFn<T> extends DoFn<T, T> {
+  private static class PrintFn<T> extends OldDoFn<T, T> {
 
     @Override
     public void processElement(ProcessContext c) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
index 4e81420..8c31783 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
@@ -49,7 +49,7 @@ public class KafkaWindowedWordCountExample {
   static final String GROUP_ID = "myGroup";  // Default groupId
   static final String ZOOKEEPER = "localhost:2181";  // Default zookeeper to connect to for Kafka
 
-  public static class ExtractWordsFn extends DoFn<String, String> {
+  public static class ExtractWordsFn extends OldDoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
         createAggregator("emptyLines", new Sum.SumLongFn());
 
@@ -71,7 +71,7 @@ public class KafkaWindowedWordCountExample {
     }
   }
 
-  public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+  public static class FormatAsStringFn extends OldDoFn<KV<String, Long>, String> {
     @Override
     public void processElement(ProcessContext c) {
       String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
index 1b532a7..d149e4e 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
@@ -59,7 +59,7 @@ public class WindowedWordCount {
   static final long WINDOW_SIZE = 10;  // Default window duration in seconds
   static final long SLIDE_SIZE = 5;  // Default window slide in seconds
 
-  static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+  static class FormatAsStringFn extends OldDoFn<KV<String, Long>, String> {
     @Override
     public void processElement(ProcessContext c) {
       String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
@@ -67,7 +67,7 @@ public class WindowedWordCount {
     }
   }
 
-  static class ExtractWordsFn extends DoFn<String, String> {
+  static class ExtractWordsFn extends OldDoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
         createAggregator("emptyLines", new Sum.SumLongFn());
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index 0bba0d0..01a3ab2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -39,7 +39,7 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -391,7 +391,7 @@ class FlinkBatchTransformTranslators {
           inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder()));
 
       // construct a map from side input to WindowingStrategy so that
-      // the DoFn runner can map main-input windows to side input windows
+      // the OldDoFn runner can map main-input windows to side input windows
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
       for (PCollectionView<?> sideInput: transform.getSideInputs()) {
         sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
@@ -494,7 +494,7 @@ class FlinkBatchTransformTranslators {
       DataSet<WindowedValue<InputT>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
 
-      final DoFn<InputT, OutputT> doFn = transform.getFn();
+      final OldDoFn<InputT, OutputT> doFn = transform.getFn();
 
       TypeInformation<WindowedValue<OutputT>> typeInformation =
           context.getTypeInfo(context.getOutput(transform));
@@ -502,7 +502,7 @@ class FlinkBatchTransformTranslators {
       List<PCollectionView<?>> sideInputs = transform.getSideInputs();
 
       // construct a map from side input to WindowingStrategy so that
-      // the DoFn runner can map main-input windows to side input windows
+      // the OldDoFn runner can map main-input windows to side input windows
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
       for (PCollectionView<?> sideInput: sideInputs) {
         sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
@@ -539,7 +539,7 @@ class FlinkBatchTransformTranslators {
       DataSet<WindowedValue<InputT>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
 
-      final DoFn<InputT, OutputT> doFn = transform.getFn();
+      final OldDoFn<InputT, OutputT> doFn = transform.getFn();
 
       Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
 
@@ -578,7 +578,7 @@ class FlinkBatchTransformTranslators {
       List<PCollectionView<?>> sideInputs = transform.getSideInputs();
 
       // construct a map from side input to WindowingStrategy so that
-      // the DoFn runner can map main-input windows to side input windows
+      // the OldDoFn runner can map main-input windows to side input windows
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
       for (PCollectionView<?> sideInput: sideInputs) {
         sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index fa6b387..5b55d42 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -35,11 +35,10 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Sink;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -71,8 +70,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.util.Collector;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -346,8 +343,8 @@ public class FlinkStreamingTransformTranslators {
       context.setOutputDataStream(context.getOutput(transform), windowedStream);
     }
 
-    private static <T, W extends BoundedWindow> DoFn<T, T> createWindowAssigner(final WindowFn<T, W> windowFn) {
-      return new DoFn<T, T>() {
+    private static <T, W extends BoundedWindow> OldDoFn<T, T> createWindowAssigner(final WindowFn<T, W> windowFn) {
+      return new OldDoFn<T, T>() {
 
         @Override
         public void processElement(final ProcessContext c) throws Exception {