You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/06 02:52:24 UTC

[06/51] [abbrv] incubator-beam git commit: Rename DoFn to OldDoFn

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index 89243a3..a4af1b0 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -19,7 +19,7 @@ package org.apache.beam.runners.flink.translation.functions;
 
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -30,13 +30,13 @@ import org.apache.flink.util.Collector;
 import java.util.Map;
 
 /**
- * Encapsulates a {@link org.apache.beam.sdk.transforms.DoFn}
+ * Encapsulates a {@link OldDoFn}
  * inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}.
  */
 public class FlinkDoFnFunction<InputT, OutputT>
     extends RichMapPartitionFunction<WindowedValue<InputT>, WindowedValue<OutputT>> {
 
-  private final DoFn<InputT, OutputT> doFn;
+  private final OldDoFn<InputT, OutputT> doFn;
   private final SerializedPipelineOptions serializedOptions;
 
   private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
@@ -47,7 +47,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
   private final WindowingStrategy<?, ?> windowingStrategy;
 
   public FlinkDoFnFunction(
-      DoFn<InputT, OutputT> doFn,
+      OldDoFn<InputT, OutputT> doFn,
       WindowingStrategy<?, ?> windowingStrategy,
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
       PipelineOptions options) {
@@ -56,7 +56,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
     this.serializedOptions = new SerializedPipelineOptions(options);
     this.windowingStrategy = windowingStrategy;
 
-    this.requiresWindowAccess = doFn instanceof DoFn.RequiresWindowAccess;
+    this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess;
     this.hasSideInputs = !sideInputs.isEmpty();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
index 9074d72..2d36043 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.flink.translation.functions;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
@@ -60,7 +60,7 @@ public class FlinkMergingNonShuffleReduceFunction<
 
   private final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn;
 
-  private final DoFn<KV<K, InputT>, KV<K, OutputT>> doFn;
+  private final OldDoFn<KV<K, InputT>, KV<K, OutputT>> doFn;
 
   private final WindowingStrategy<?, W> windowingStrategy;
 
@@ -81,8 +81,8 @@ public class FlinkMergingNonShuffleReduceFunction<
 
     this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
 
-    // dummy DoFn because we need one for ProcessContext
-    this.doFn = new DoFn<KV<K, InputT>, KV<K, OutputT>>() {
+    // dummy OldDoFn because we need one for ProcessContext
+    this.doFn = new OldDoFn<KV<K, InputT>, KV<K, OutputT>>() {
       @Override
       public void processElement(ProcessContext c) throws Exception {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
index f92e76f..6e673fc 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
@@ -19,7 +19,7 @@ package org.apache.beam.runners.flink.translation.functions;
 
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -32,7 +32,7 @@ import org.apache.flink.util.Collector;
 import java.util.Map;
 
 /**
- * Encapsulates a {@link org.apache.beam.sdk.transforms.DoFn} that uses side outputs
+ * Encapsulates a {@link OldDoFn} that uses side outputs
  * inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}.
  *
  * We get a mapping from {@link org.apache.beam.sdk.values.TupleTag} to output index
@@ -42,7 +42,7 @@ import java.util.Map;
 public class FlinkMultiOutputDoFnFunction<InputT, OutputT>
     extends RichMapPartitionFunction<WindowedValue<InputT>, WindowedValue<RawUnionValue>> {
 
-  private final DoFn<InputT, OutputT> doFn;
+  private final OldDoFn<InputT, OutputT> doFn;
   private final SerializedPipelineOptions serializedOptions;
 
   private final Map<TupleTag<?>, Integer> outputMap;
@@ -55,7 +55,7 @@ public class FlinkMultiOutputDoFnFunction<InputT, OutputT>
   private final WindowingStrategy<?, ?> windowingStrategy;
 
   public FlinkMultiOutputDoFnFunction(
-      DoFn<InputT, OutputT> doFn,
+      OldDoFn<InputT, OutputT> doFn,
       WindowingStrategy<?, ?> windowingStrategy,
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
       PipelineOptions options,
@@ -64,7 +64,7 @@ public class FlinkMultiOutputDoFnFunction<InputT, OutputT>
     this.serializedOptions = new SerializedPipelineOptions(options);
     this.outputMap = outputMap;
 
-    this.requiresWindowAccess = doFn instanceof DoFn.RequiresWindowAccess;
+    this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess;
     this.hasSideInputs = !sideInputs.isEmpty();
     this.windowingStrategy = windowingStrategy;
     this.sideInputs = sideInputs;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
index 71b6d27..fab3c85 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.flink.translation.functions;
 
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -35,7 +35,7 @@ import java.util.Collection;
 import java.util.Map;
 
 /**
- * {@link DoFn.ProcessContext} for {@link FlinkMultiOutputDoFnFunction} that supports
+ * {@link OldDoFn.ProcessContext} for {@link FlinkMultiOutputDoFnFunction} that supports
  * side outputs.
  */
 class FlinkMultiOutputProcessContext<InputT, OutputT>
@@ -50,7 +50,7 @@ class FlinkMultiOutputProcessContext<InputT, OutputT>
   FlinkMultiOutputProcessContext(
       PipelineOptions pipelineOptions,
       RuntimeContext runtimeContext,
-      DoFn<InputT, OutputT> doFn,
+      OldDoFn<InputT, OutputT> doFn,
       WindowingStrategy<?, ?> windowingStrategy,
       Collector<WindowedValue<RawUnionValue>> collector,
       Map<TupleTag<?>, Integer> outputMap,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
index d49821b..98446f9 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
@@ -17,18 +17,16 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 
 import org.joda.time.Instant;
 
-import java.util.Collection;
-
 /**
  * {@link WindowFn.AssignContext} for calling a {@link WindowFn} for elements emitted from
- * {@link org.apache.beam.sdk.transforms.DoFn#startBundle(DoFn.Context)}
- * or {@link DoFn#finishBundle(DoFn.Context)}.
+ * {@link OldDoFn#startBundle(OldDoFn.Context)}
+ * or {@link OldDoFn#finishBundle(OldDoFn.Context)}.
  *
  * <p>In those cases the {@code WindowFn} is not allowed to access any element information.
  */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index c29e1df..2db4b7b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.flink.translation.functions;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -58,7 +58,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
 
   protected final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn;
 
-  protected final DoFn<KV<K, InputT>, KV<K, AccumT>> doFn;
+  protected final OldDoFn<KV<K, InputT>, KV<K, AccumT>> doFn;
 
   protected final WindowingStrategy<?, W> windowingStrategy;
 
@@ -77,8 +77,8 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
     this.sideInputs = sideInputs;
     this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
 
-    // dummy DoFn because we need one for ProcessContext
-    this.doFn = new DoFn<KV<K, InputT>, KV<K, AccumT>>() {
+    // dummy OldDoFn because we need one for ProcessContext
+    this.doFn = new OldDoFn<KV<K, InputT>, KV<K, AccumT>>() {
       @Override
       public void processElement(ProcessContext c) throws Exception {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
index 235a803..3954d1f 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.TimerInternals;
@@ -48,10 +48,10 @@ import java.util.Iterator;
 import java.util.Map;
 
 /**
- * {@link org.apache.beam.sdk.transforms.DoFn.ProcessContext} for our Flink Wrappers.
+ * {@link OldDoFn.ProcessContext} for our Flink Wrappers.
  */
 class FlinkProcessContext<InputT, OutputT>
-    extends DoFn<InputT, OutputT>.ProcessContext {
+    extends OldDoFn<InputT, OutputT>.ProcessContext {
 
   private final PipelineOptions pipelineOptions;
   private final RuntimeContext runtimeContext;
@@ -67,7 +67,7 @@ class FlinkProcessContext<InputT, OutputT>
   FlinkProcessContext(
       PipelineOptions pipelineOptions,
       RuntimeContext runtimeContext,
-      DoFn<InputT, OutputT> doFn,
+      OldDoFn<InputT, OutputT> doFn,
       WindowingStrategy<?, ?> windowingStrategy,
       Collector<WindowedValue<OutputT>> collector,
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs) {
@@ -80,7 +80,7 @@ class FlinkProcessContext<InputT, OutputT>
     this.pipelineOptions = pipelineOptions;
     this.runtimeContext = runtimeContext;
     this.collector = collector;
-    this.requiresWindowAccess = doFn instanceof DoFn.RequiresWindowAccess;
+    this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess;
     this.windowingStrategy = windowingStrategy;
     this.sideInputs = sideInputs;
 
@@ -90,7 +90,7 @@ class FlinkProcessContext<InputT, OutputT>
   FlinkProcessContext(
       PipelineOptions pipelineOptions,
       RuntimeContext runtimeContext,
-      DoFn<InputT, OutputT> doFn,
+      OldDoFn<InputT, OutputT> doFn,
       WindowingStrategy<?, ?> windowingStrategy,
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs) {
     doFn.super();
@@ -101,7 +101,7 @@ class FlinkProcessContext<InputT, OutputT>
     this.pipelineOptions = pipelineOptions;
     this.runtimeContext = runtimeContext;
     this.collector = null;
-    this.requiresWindowAccess = doFn instanceof DoFn.RequiresWindowAccess;
+    this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess;
     this.windowingStrategy = windowingStrategy;
     this.sideInputs = sideInputs;
 
@@ -141,7 +141,7 @@ class FlinkProcessContext<InputT, OutputT>
   public BoundedWindow window() {
     if (!requiresWindowAccess) {
       throw new UnsupportedOperationException(
-          "window() is only available in the context of a DoFn marked as RequiresWindowAccess.");
+          "window() is only available in the context of a OldDoFn marked as RequiresWindowAccess.");
     }
     return Iterables.getOnlyElement(windowedValue.getWindows());
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index 9cbc6b9..b1729a4 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.flink.translation.functions;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -60,7 +60,7 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
 
   protected final CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> combineFn;
 
-  protected final DoFn<KV<K, AccumT>, KV<K, OutputT>> doFn;
+  protected final OldDoFn<KV<K, AccumT>, KV<K, OutputT>> doFn;
 
   protected final WindowingStrategy<?, W> windowingStrategy;
 
@@ -81,8 +81,8 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
 
     this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
 
-    // dummy DoFn because we need one for ProcessContext
-    this.doFn = new DoFn<KV<K, AccumT>, KV<K, OutputT>>() {
+    // dummy OldDoFn because we need one for ProcessContext
+    this.doFn = new OldDoFn<KV<K, AccumT>, KV<K, OutputT>>() {
       @Override
       public void processElement(ProcessContext c) throws Exception {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
index e40d6e3..74ec66a 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -24,7 +24,7 @@ import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregat
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -52,13 +52,13 @@ import java.util.Collection;
  * */
 public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFlatMapFunction<WindowedValue<IN>, WindowedValue<OUTFL>> {
 
-  private final DoFn<IN, OUTDF> doFn;
+  private final OldDoFn<IN, OUTDF> doFn;
   private final WindowingStrategy<?, ?> windowingStrategy;
   private final SerializedPipelineOptions serializedPipelineOptions;
 
   private DoFnProcessContext context;
 
-  public FlinkAbstractParDoWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUTDF> doFn) {
+  public FlinkAbstractParDoWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, OldDoFn<IN, OUTDF> doFn) {
     checkNotNull(options);
     checkNotNull(windowingStrategy);
     checkNotNull(doFn);
@@ -104,15 +104,15 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl
     doFn.processElement(this.context);
   }
 
-  private class DoFnProcessContext extends DoFn<IN, OUTDF>.ProcessContext {
+  private class DoFnProcessContext extends OldDoFn<IN, OUTDF>.ProcessContext {
 
-    private final DoFn<IN, OUTDF> fn;
+    private final OldDoFn<IN, OUTDF> fn;
 
     protected final Collector<WindowedValue<OUTFL>> collector;
 
     private WindowedValue<IN> element;
 
-    private DoFnProcessContext(DoFn<IN, OUTDF> function,
+    private DoFnProcessContext(OldDoFn<IN, OUTDF> function,
           Collector<WindowedValue<OUTFL>> outCollector) {
       function.super();
       super.setupDelegateAggregators();
@@ -137,9 +137,9 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl
 
     @Override
     public BoundedWindow window() {
-      if (!(fn instanceof DoFn.RequiresWindowAccess)) {
+      if (!(fn instanceof OldDoFn.RequiresWindowAccess)) {
         throw new UnsupportedOperationException(
-            "window() is only available in the context of a DoFn marked as RequiresWindowAccess.");
+            "window() is only available in the context of a OldDoFn marked as RequiresWindowAccess.");
       }
 
       Collection<? extends BoundedWindow> windows = this.element.getWindows();
@@ -211,7 +211,7 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl
       throw new IllegalArgumentException(String.format(
           "Cannot output with timestamp %s. Output timestamps must be no earlier than the "
               + "timestamp of the current input (%s) minus the allowed skew (%s). See the "
-              + "DoFn#getAllowedTimestmapSkew() Javadoc for details on changing the allowed skew.",
+              + "OldDoFn#getAllowedTimestmapSkew() Javadoc for details on changing the allowed skew.",
           timestamp, ref.getTimestamp(),
           PeriodFormat.getDefault().print(doFn.getAllowedTimestampSkew().toPeriod())));
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
index 0e977db..103a12b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
@@ -36,7 +36,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -112,7 +112,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
 
   private transient CoderRegistry coderRegistry;
 
-  private DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> operator;
+  private OldDoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> operator;
 
   private ProcessContext context;
 
@@ -263,7 +263,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
    * a function with that combiner is created, so that elements are combined as they arrive. This is
    * done for speed and (in most of the cases) for reduction of the per-window state.
    */
-  private <W extends BoundedWindow> DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> createGroupAlsoByWindowOperator() {
+  private <W extends BoundedWindow> OldDoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> createGroupAlsoByWindowOperator() {
     if (this.operator == null) {
 
       StateInternalsFactory<K> stateInternalsFactory = new GroupAlsoByWindowWrapperStateInternalsFactory();
@@ -272,7 +272,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
         // Thus VOUT == Iterable<VIN>
         Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder();
 
-        this.operator = (DoFn) GroupAlsoByWindowViaWindowSetDoFn.create(
+        this.operator = (OldDoFn) GroupAlsoByWindowViaWindowSetDoFn.create(
             (WindowingStrategy<?, W>) this.windowingStrategy, stateInternalsFactory, SystemReduceFn.<K, VIN, W>buffering(inputValueCoder));
       } else {
         Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
@@ -446,7 +446,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
 
     private KeyedWorkItem<K, VIN> element;
 
-    public ProcessContext(DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> function,
+    public ProcessContext(OldDoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> function,
                           TimestampedCollector<WindowedValue<KV<K, VOUT>>> outCollector,
                           FlinkTimerInternals timerInternals) {
       function.super();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
index 619b887..0ea0cab 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingInternals;
@@ -40,7 +40,7 @@ public class FlinkParDoBoundMultiWrapper<IN, OUT> extends FlinkAbstractParDoWrap
   private final TupleTag<?> mainTag;
   private final Map<TupleTag<?>, Integer> outputLabels;
 
-  public FlinkParDoBoundMultiWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn, TupleTag<?> mainTag, Map<TupleTag<?>, Integer> tagsToLabels) {
+  public FlinkParDoBoundMultiWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, OldDoFn<IN, OUT> doFn, TupleTag<?> mainTag, Map<TupleTag<?>, Integer> tagsToLabels) {
     super(options, windowingStrategy, doFn);
     this.mainTag = checkNotNull(mainTag);
     this.outputLabels = checkNotNull(tagsToLabels);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
index 4def0c6..6be94b2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
@@ -19,7 +19,7 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.TimerInternals;
@@ -41,7 +41,7 @@ import java.util.Collection;
  * */
 public class FlinkParDoBoundWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, OUT> {
 
-  public FlinkParDoBoundWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn) {
+  public FlinkParDoBoundWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, OldDoFn<IN, OUT> doFn) {
     super(options, windowingStrategy, doFn);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
index 9e55002..a0b33f8 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
@@ -19,7 +19,7 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
 
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimerInternals;
 
@@ -106,7 +106,7 @@ public abstract class AbstractFlinkTimerInternals<K, VIN> implements TimerIntern
     }
   }
 
-  public void encodeTimerInternals(DoFn.ProcessContext context,
+  public void encodeTimerInternals(OldDoFn.ProcessContext context,
                                    StateCheckpointWriter writer,
                                    KvCoder<K, VIN> kvCoder,
                                    Coder<? extends BoundedWindow> windowCoder) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 61e219c..c24d91d 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -114,7 +114,7 @@ public class PipelineOptionsTest {
   }
 
 
-  private static class TestDoFn extends DoFn<Object, Object> {
+  private static class TestDoFn extends OldDoFn<Object, Object> {
 
     @Override
     public void processElement(ProcessContext c) throws Exception {
@@ -126,7 +126,7 @@ public class PipelineOptionsTest {
   }
 
   private static class TestParDoWrapper extends FlinkAbstractParDoWrapper {
-    public TestParDoWrapper(PipelineOptions options, WindowingStrategy windowingStrategy, DoFn doFn) {
+    public TestParDoWrapper(PipelineOptions options, WindowingStrategy windowingStrategy, OldDoFn doFn) {
       super(options, windowingStrategy, doFn);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
index bb79b27..ca70096 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.flink;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -72,7 +72,7 @@ public class ReadSourceITCase extends JavaProgramTestBase {
 
     PCollection<String> result = p
         .apply(CountingInput.upTo(10))
-        .apply(ParDo.of(new DoFn<Long, String>() {
+        .apply(ParDo.of(new OldDoFn<Long, String>() {
           @Override
           public void processElement(ProcessContext c) throws Exception {
             c.output(c.element().toString());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
index fe71802..bc69f34 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.flink;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import com.google.common.base.Joiner;
 import org.apache.flink.streaming.util.StreamingProgramTestBase;
@@ -59,7 +59,7 @@ public class ReadSourceStreamingITCase extends StreamingProgramTestBase {
 
     p
       .apply(CountingInput.upTo(10))
-      .apply(ParDo.of(new DoFn<Long, String>() {
+      .apply(ParDo.of(new OldDoFn<Long, String>() {
           @Override
           public void processElement(ProcessContext c) throws Exception {
             c.output(c.element().toString());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
index 1b55c61..ca183a8 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
@@ -21,7 +21,7 @@ import org.apache.beam.runners.flink.FlinkTestPipeline;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
@@ -61,7 +61,7 @@ public class GroupByNullKeyTest extends StreamingProgramTestBase implements Seri
     compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
   }
 
-  public static class ExtractUserAndTimestamp extends DoFn<KV<Integer, String>, String> {
+  public static class ExtractUserAndTimestamp extends OldDoFn<KV<Integer, String>, String> {
     private static final long serialVersionUID = 0;
 
     @Override
@@ -97,7 +97,7 @@ public class GroupByNullKeyTest extends StreamingProgramTestBase implements Seri
               .withAllowedLateness(Duration.ZERO)
               .discardingFiredPanes())
 
-          .apply(ParDo.of(new DoFn<String, KV<Void, String>>() {
+          .apply(ParDo.of(new OldDoFn<String, KV<Void, String>>() {
             @Override
             public void processElement(ProcessContext c) throws Exception {
               String elem = c.element();
@@ -105,7 +105,7 @@ public class GroupByNullKeyTest extends StreamingProgramTestBase implements Seri
             }
           }))
           .apply(GroupByKey.<Void, String>create())
-          .apply(ParDo.of(new DoFn<KV<Void, Iterable<String>>, String>() {
+          .apply(ParDo.of(new OldDoFn<KV<Void, Iterable<String>>, String>() {
             @Override
             public void processElement(ProcessContext c) throws Exception {
               KV<Void, Iterable<String>> elem = c.element();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
index 1efb42f..7912aee 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
@@ -22,7 +22,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -103,7 +103,7 @@ public class TopWikipediaSessionsITCase extends StreamingProgramTestBase impleme
 
 
 
-      .apply(ParDo.of(new DoFn<TableRow, String>() {
+      .apply(ParDo.of(new OldDoFn<TableRow, String>() {
         @Override
         public void processElement(ProcessContext c) throws Exception {
           TableRow row = c.element();
@@ -120,7 +120,7 @@ public class TopWikipediaSessionsITCase extends StreamingProgramTestBase impleme
 
       .apply(Count.<String>perElement());
 
-    PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>, String>() {
+    PCollection<String> format = output.apply(ParDo.of(new OldDoFn<KV<String, Long>, String>() {
       @Override
       public void processElement(ProcessContext c) throws Exception {
         KV<String, Long> el = c.element();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 7fd203f..ac06b52 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -47,9 +47,9 @@ import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.runners.TransformTreeNode;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
@@ -82,7 +82,6 @@ import com.google.api.services.dataflow.model.WorkerPool;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -94,7 +93,6 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-
 import javax.annotation.Nullable;
 
 /**
@@ -1021,7 +1019,7 @@ public class DataflowPipelineTranslator {
   }
 
   private static void translateFn(
-      DoFn fn,
+      OldDoFn fn,
       WindowingStrategy windowingStrategy,
       Iterable<PCollectionView<?>> sideInputs,
       Coder inputCoder,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index e7cc20e..d762d50 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -78,9 +78,9 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -143,7 +143,6 @@ import com.google.common.collect.Multimap;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-
 import org.joda.time.DateTimeUtils;
 import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormat;
@@ -173,7 +172,6 @@ import java.util.Random;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
-
 import javax.annotation.Nullable;
 
 /**
@@ -762,13 +760,14 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       PTransform<PCollection<T>, PCollection<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>>> {
 
     /**
-     * A {@link DoFn} that for each element outputs a {@code KV} structure suitable for
+     * A {@link OldDoFn} that for each element outputs a {@code KV} structure suitable for
      * grouping by the hash of the window's byte representation and sorting the grouped values
      * using the window's byte representation.
      */
     @SystemDoFnInternal
     private static class UseWindowHashAsKeyAndWindowAsSortKeyDoFn<T, W extends BoundedWindow>
-        extends DoFn<T, KV<Integer, KV<W, WindowedValue<T>>>> implements DoFn.RequiresWindowAccess {
+        extends OldDoFn<T, KV<Integer, KV<W, WindowedValue<T>>>> implements
+        OldDoFn.RequiresWindowAccess {
 
       private final IsmRecordCoder<?> ismCoderForHash;
       private UseWindowHashAsKeyAndWindowAsSortKeyDoFn(IsmRecordCoder<?> ismCoderForHash) {
@@ -828,15 +827,15 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       extends PTransform<PCollection<T>, PCollectionView<T>> {
 
     /**
-     * A {@link DoFn} that outputs {@link IsmRecord}s. These records are structured as follows:
+     * A {@link OldDoFn} that outputs {@link IsmRecord}s. These records are structured as follows:
      * <ul>
      *   <li>Key 1: Window
      *   <li>Value: Windowed value
      * </ul>
      */
     static class IsmRecordForSingularValuePerWindowDoFn<T, W extends BoundedWindow>
-        extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
-                     IsmRecord<WindowedValue<T>>> {
+        extends OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
+                             IsmRecord<WindowedValue<T>>> {
 
       private final Coder<W> windowCoder;
       IsmRecordForSingularValuePerWindowDoFn(Coder<W> windowCoder) {
@@ -902,8 +901,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
         applyForSingleton(
             DataflowRunner runner,
             PCollection<T> input,
-            DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
-                 IsmRecord<WindowedValue<FinalT>>> doFn,
+            OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
+                             IsmRecord<WindowedValue<FinalT>>> doFn,
             boolean hasDefault,
             FinalT defaultValue,
             Coder<FinalT> defaultValueCoder) {
@@ -998,7 +997,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   static class BatchViewAsList<T>
       extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
     /**
-     * A {@link DoFn} which creates {@link IsmRecord}s assuming that each element is within the
+     * A {@link OldDoFn} which creates {@link IsmRecord}s assuming that each element is within the
      * global window. Each {@link IsmRecord} has
      * <ul>
      *   <li>Key 1: Global window</li>
@@ -1008,7 +1007,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
      */
     @SystemDoFnInternal
     static class ToIsmRecordForGlobalWindowDoFn<T>
-        extends DoFn<T, IsmRecord<WindowedValue<T>>> {
+        extends OldDoFn<T, IsmRecord<WindowedValue<T>>> {
 
       long indexInBundle;
       @Override
@@ -1030,7 +1029,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     /**
-     * A {@link DoFn} which creates {@link IsmRecord}s comparing successive elements windows
+     * A {@link OldDoFn} which creates {@link IsmRecord}s comparing successive elements windows
      * to locate the window boundaries. The {@link IsmRecord} has:
      * <ul>
      *   <li>Key 1: Window</li>
@@ -1040,8 +1039,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
      */
     @SystemDoFnInternal
     static class ToIsmRecordForNonGlobalWindowDoFn<T, W extends BoundedWindow>
-        extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
-                     IsmRecord<WindowedValue<T>>> {
+        extends OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
+                             IsmRecord<WindowedValue<T>>> {
 
       private final Coder<W> windowCoder;
       ToIsmRecordForNonGlobalWindowDoFn(Coder<W> windowCoder) {
@@ -1174,7 +1173,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
 
     /**
-     * A {@link DoFn} which groups elements by window boundaries. For each group,
+     * A {@link OldDoFn} which groups elements by window boundaries. For each group,
      * the group of elements is transformed into a {@link TransformedMap}.
      * The transformed {@code Map<K, V>} is backed by a {@code Map<K, WindowedValue<V>>}
      * and contains a function {@code WindowedValue<V> -> V}.
@@ -1188,10 +1187,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
      * </ul>
      */
     static class ToMapDoFn<K, V, W extends BoundedWindow>
-        extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>,
-                     IsmRecord<WindowedValue<TransformedMap<K,
-                                             WindowedValue<V>,
-                                             V>>>> {
+        extends OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>,
+                             IsmRecord<WindowedValue<TransformedMap<K,
+                                                     WindowedValue<V>,
+                                                     V>>>> {
 
       private final Coder<W> windowCoder;
       ToMapDoFn(Coder<W> windowCoder) {
@@ -1358,8 +1357,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
       @SystemDoFnInternal
       private static class GroupByKeyHashAndSortByKeyAndWindowDoFn<K, V, W>
-          extends DoFn<KV<K, V>, KV<Integer, KV<KV<K, W>, WindowedValue<V>>>>
-          implements DoFn.RequiresWindowAccess {
+          extends OldDoFn<KV<K, V>, KV<Integer, KV<KV<K, W>, WindowedValue<V>>>>
+          implements OldDoFn.RequiresWindowAccess {
 
         private final IsmRecordCoder<?> coder;
         private GroupByKeyHashAndSortByKeyAndWindowDoFn(IsmRecordCoder<?> coder) {
@@ -1412,7 +1411,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     /**
-     * A {@link DoFn} which creates {@link IsmRecord}s comparing successive elements windows
+     * A {@link OldDoFn} which creates {@link IsmRecord}s comparing successive elements windows
      * and keys to locate window and key boundaries. The main output {@link IsmRecord}s have:
      * <ul>
      *   <li>Key 1: Window</li>
@@ -1424,12 +1423,12 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
      * <p>Additionally, we output all the unique keys per window seen to {@code outputForEntrySet}
      * and the unique key count per window to {@code outputForSize}.
      *
-     * <p>Finally, if this DoFn has been requested to perform unique key checking, it will
+     * <p>Finally, if this OldDoFn has been requested to perform unique key checking, it will
      * throw an {@link IllegalStateException} if more than one key per window is found.
      */
     static class ToIsmRecordForMapLikeDoFn<K, V, W extends BoundedWindow>
-        extends DoFn<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>,
-                     IsmRecord<WindowedValue<V>>> {
+        extends OldDoFn<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>,
+                             IsmRecord<WindowedValue<V>>> {
 
       private final TupleTag<KV<Integer, KV<W, Long>>> outputForSize;
       private final TupleTag<KV<Integer, KV<W, K>>> outputForEntrySet;
@@ -1557,7 +1556,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     /**
-     * A {@link DoFn} which outputs a metadata {@link IsmRecord} per window of:
+     * A {@link OldDoFn} which outputs a metadata {@link IsmRecord} per window of:
        * <ul>
        *   <li>Key 1: META key</li>
        *   <li>Key 2: window</li>
@@ -1565,11 +1564,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
        *   <li>Value: sum of values for window</li>
        * </ul>
        *
-       * <p>This {@link DoFn} is meant to be used to compute the number of unique keys
+       * <p>This {@link OldDoFn} is meant to be used to compute the number of unique keys
        * per window for map and multimap side inputs.
        */
     static class ToIsmMetadataRecordForSizeDoFn<K, V, W extends BoundedWindow>
-        extends DoFn<KV<Integer, Iterable<KV<W, Long>>>, IsmRecord<WindowedValue<V>>> {
+        extends OldDoFn<KV<Integer, Iterable<KV<W, Long>>>, IsmRecord<WindowedValue<V>>> {
       private final Coder<W> windowCoder;
       ToIsmMetadataRecordForSizeDoFn(Coder<W> windowCoder) {
         this.windowCoder = windowCoder;
@@ -1606,7 +1605,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     /**
-     * A {@link DoFn} which outputs a metadata {@link IsmRecord} per window and key pair of:
+     * A {@link OldDoFn} which outputs a metadata {@link IsmRecord} per window and key pair of:
        * <ul>
        *   <li>Key 1: META key</li>
        *   <li>Key 2: window</li>
@@ -1614,11 +1613,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
        *   <li>Value: key</li>
        * </ul>
        *
-       * <p>This {@link DoFn} is meant to be used to output index to key records
+       * <p>This {@link OldDoFn} is meant to be used to output index to key records
        * per window for map and multimap side inputs.
        */
     static class ToIsmMetadataRecordForKeyDoFn<K, V, W extends BoundedWindow>
-        extends DoFn<KV<Integer, Iterable<KV<W, K>>>, IsmRecord<WindowedValue<V>>> {
+        extends OldDoFn<KV<Integer, Iterable<KV<W, K>>>, IsmRecord<WindowedValue<V>>> {
 
       private final Coder<K> keyCoder;
       private final Coder<W> windowCoder;
@@ -1658,7 +1657,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     /**
-     * A {@link DoFn} which partitions sets of elements by window boundaries. Within each
+     * A {@link OldDoFn} which partitions sets of elements by window boundaries. Within each
      * partition, the set of elements is transformed into a {@link TransformedMap}.
      * The transformed {@code Map<K, Iterable<V>>} is backed by a
      * {@code Map<K, Iterable<WindowedValue<V>>>} and contains a function
@@ -1673,10 +1672,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
      * </ul>
      */
     static class ToMultimapDoFn<K, V, W extends BoundedWindow>
-        extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>,
-                     IsmRecord<WindowedValue<TransformedMap<K,
-                                                            Iterable<WindowedValue<V>>,
-                                                            Iterable<V>>>>> {
+        extends OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>,
+                             IsmRecord<WindowedValue<TransformedMap<K,
+                                                                    Iterable<WindowedValue<V>>,
+                                                                    Iterable<V>>>>> {
 
       private final Coder<W> windowCoder;
       ToMultimapDoFn(Coder<W> windowCoder) {
@@ -2335,7 +2334,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
           // WindmillSink.
           .apply(Reshuffle.<Integer, ValueWithRecordId<T>>of())
           .apply("StripIds", ParDo.of(
-              new DoFn<KV<Integer, ValueWithRecordId<T>>, T>() {
+              new OldDoFn<KV<Integer, ValueWithRecordId<T>>, T>() {
                 @Override
                 public void processElement(ProcessContext c) {
                   c.output(c.element().getValue().getValue());
@@ -2372,11 +2371,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   }
 
   /**
-   * A specialized {@link DoFn} for writing the contents of a {@link PCollection}
+   * A specialized {@link OldDoFn} for writing the contents of a {@link PCollection}
    * to a streaming {@link PCollectionView} backend implementation.
    */
   private static class StreamingPCollectionViewWriterFn<T>
-  extends DoFn<Iterable<T>, T> implements DoFn.RequiresWindowAccess {
+  extends OldDoFn<Iterable<T>, T> implements OldDoFn.RequiresWindowAccess {
     private final PCollectionView<?> view;
     private final Coder<T> dataCoder;
 
@@ -2553,7 +2552,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
   }
 
-  private static class WrapAsList<T> extends DoFn<T, List<T>> {
+  private static class WrapAsList<T> extends OldDoFn<T, List<T>> {
     @Override
     public void processElement(ProcessContext c) {
       c.output(Arrays.asList(c.element()));
@@ -2716,7 +2715,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     @Nullable
     private PTransform<?, ?> transform;
     @Nullable
-    private DoFn<?, ?> doFn;
+    private OldDoFn<?, ?> doFn;
 
     /**
      * Builds an instance of this class from the overridden transform.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
index 5f808a5..d4f9a90 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.dataflow.internal;
 
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -63,9 +63,9 @@ public class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>>
     } else {
       // If the windowFn didn't change, we just run a pass-through transform and then set the
       // new windowing strategy.
-      return input.apply("Identity", ParDo.of(new DoFn<T, T>() {
+      return input.apply("Identity", ParDo.of(new OldDoFn<T, T>() {
         @Override
-        public void processElement(DoFn<T, T>.ProcessContext c) throws Exception {
+        public void processElement(OldDoFn<T, T>.ProcessContext c) throws Exception {
           c.output(c.element());
         }
       })).setWindowingStrategyInternal(outputStrategy);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
index f83acbc..2017313 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
@@ -18,32 +18,32 @@
 package org.apache.beam.runners.dataflow.util;
 
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollectionView;
 
 import java.io.Serializable;
 
 /**
- * Wrapper class holding the necessary information to serialize a DoFn.
+ * Wrapper class holding the necessary information to serialize a OldDoFn.
  *
- * @param <InputT> the type of the (main) input elements of the DoFn
- * @param <OutputT> the type of the (main) output elements of the DoFn
+ * @param <InputT> the type of the (main) input elements of the OldDoFn
+ * @param <OutputT> the type of the (main) output elements of the OldDoFn
  */
 public class DoFnInfo<InputT, OutputT> implements Serializable {
-  private final DoFn<InputT, OutputT> doFn;
+  private final OldDoFn<InputT, OutputT> doFn;
   private final WindowingStrategy<?, ?> windowingStrategy;
   private final Iterable<PCollectionView<?>> sideInputViews;
   private final Coder<InputT> inputCoder;
 
-  public DoFnInfo(DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy) {
+  public DoFnInfo(OldDoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy) {
     this.doFn = doFn;
     this.windowingStrategy = windowingStrategy;
     this.sideInputViews = null;
     this.inputCoder = null;
   }
 
-  public DoFnInfo(DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy,
+  public DoFnInfo(OldDoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy,
                   Iterable<PCollectionView<?>> sideInputViews, Coder<InputT> inputCoder) {
     this.doFn = doFn;
     this.windowingStrategy = windowingStrategy;
@@ -51,7 +51,7 @@ public class DoFnInfo<InputT, OutputT> implements Serializable {
     this.inputCoder = inputCoder;
   }
 
-  public DoFn<InputT, OutputT> getDoFn() {
+  public OldDoFn<InputT, OutputT> getDoFn() {
     return doFn;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 7d89735..2a01c03 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -49,7 +49,7 @@ import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
@@ -506,7 +506,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
   }
 
   /**
-   * Returns a Step for a DoFn by creating and translating a pipeline.
+   * Returns a Step for a OldDoFn by creating and translating a pipeline.
    */
   private static Step createPredefinedStep() throws Exception {
     DataflowPipelineOptions options = buildPipelineOptions();
@@ -530,7 +530,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     return step;
   }
 
-  private static class NoOpFn extends DoFn<String, String> {
+  private static class NoOpFn extends OldDoFn<String, String> {
     @Override public void processElement(ProcessContext c) throws Exception {
       c.output(c.element());
     }
@@ -864,7 +864,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
     Pipeline pipeline = Pipeline.create(options);
 
-    DoFn<Integer, Integer> fn1 = new DoFn<Integer, Integer>() {
+    OldDoFn<Integer, Integer> fn1 = new OldDoFn<Integer, Integer>() {
       @Override
       public void processElement(ProcessContext c) throws Exception {
         c.output(c.element());
@@ -880,7 +880,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
       }
     };
 
-    DoFn<Integer, Integer> fn2 = new DoFn<Integer, Integer>() {
+    OldDoFn<Integer, Integer> fn2 = new OldDoFn<Integer, Integer>() {
       @Override
       public void processElement(ProcessContext c) throws Exception {
         c.output(c.element());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index 4951043..0677030 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -25,8 +25,8 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -44,7 +44,7 @@ public class WordCount {
    * of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the
    * pipeline.
    */
-  static class ExtractWordsFn extends DoFn<String, String> {
+  static class ExtractWordsFn extends OldDoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
         createAggregator("emptyLines", new Sum.SumLongFn());
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
index b5888bd..f4ce516 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
@@ -19,7 +19,7 @@
 package org.apache.beam.runners.spark.translation;
 
 import org.apache.beam.runners.spark.util.BroadcastHelper;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
 
@@ -39,7 +39,7 @@ import java.util.Map;
 public class DoFnFunction<InputT, OutputT>
     implements FlatMapFunction<Iterator<WindowedValue<InputT>>,
     WindowedValue<OutputT>> {
-  private final DoFn<InputT, OutputT> mFunction;
+  private final OldDoFn<InputT, OutputT> mFunction;
   private final SparkRuntimeContext mRuntimeContext;
   private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
 
@@ -48,7 +48,7 @@ public class DoFnFunction<InputT, OutputT>
    * @param runtime    Runtime to apply function in.
    * @param sideInputs Side inputs used in DoFunction.
    */
-  public DoFnFunction(DoFn<InputT, OutputT> fn,
+  public DoFnFunction(OldDoFn<InputT, OutputT> fn,
                SparkRuntimeContext runtime,
                Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
     this.mFunction = fn;
@@ -69,7 +69,7 @@ public class DoFnFunction<InputT, OutputT>
 
     private final List<WindowedValue<OutputT>> outputs = new LinkedList<>();
 
-    ProcCtxt(DoFn<InputT, OutputT> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>,
+    ProcCtxt(OldDoFn<InputT, OutputT> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>,
         BroadcastHelper<?>> sideInputs) {
       super(fn, runtimeContext, sideInputs);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index daa767d..e33578d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -19,7 +19,7 @@
 package org.apache.beam.runners.spark.translation;
 
 import org.apache.beam.runners.spark.util.BroadcastHelper;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
 
@@ -45,13 +45,13 @@ import scala.Tuple2;
  */
 class MultiDoFnFunction<InputT, OutputT>
     implements PairFlatMapFunction<Iterator<WindowedValue<InputT>>, TupleTag<?>, WindowedValue<?>> {
-  private final DoFn<InputT, OutputT> mFunction;
+  private final OldDoFn<InputT, OutputT> mFunction;
   private final SparkRuntimeContext mRuntimeContext;
   private final TupleTag<OutputT> mMainOutputTag;
   private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
 
   MultiDoFnFunction(
-      DoFn<InputT, OutputT> fn,
+      OldDoFn<InputT, OutputT> fn,
       SparkRuntimeContext runtimeContext,
       TupleTag<OutputT> mainOutputTag,
       Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
@@ -75,7 +75,7 @@ class MultiDoFnFunction<InputT, OutputT>
 
     private final Multimap<TupleTag<?>, WindowedValue<?>> outputs = LinkedListMultimap.create();
 
-    ProcCtxt(DoFn<InputT, OutputT> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>,
+    ProcCtxt(OldDoFn<InputT, OutputT> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>,
         BroadcastHelper<?>> sideInputs) {
       super(fn, runtimeContext, sideInputs);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index cad2a8e..58ac03c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -23,7 +23,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.TimerInternals;
@@ -50,17 +50,17 @@ import java.util.Map;
  * Spark runner process context.
  */
 public abstract class SparkProcessContext<InputT, OutputT, ValueT>
-    extends DoFn<InputT, OutputT>.ProcessContext {
+    extends OldDoFn<InputT, OutputT>.ProcessContext {
 
   private static final Logger LOG = LoggerFactory.getLogger(SparkProcessContext.class);
 
-  private final DoFn<InputT, OutputT> fn;
+  private final OldDoFn<InputT, OutputT> fn;
   private final SparkRuntimeContext mRuntimeContext;
   private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
 
   protected WindowedValue<InputT> windowedValue;
 
-  SparkProcessContext(DoFn<InputT, OutputT> fn,
+  SparkProcessContext(OldDoFn<InputT, OutputT> fn,
       SparkRuntimeContext runtime,
       Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
     fn.super();
@@ -135,9 +135,9 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
 
   @Override
   public BoundedWindow window() {
-    if (!(fn instanceof DoFn.RequiresWindowAccess)) {
+    if (!(fn instanceof OldDoFn.RequiresWindowAccess)) {
       throw new UnsupportedOperationException(
-          "window() is only available in the context of a DoFn marked as RequiresWindowAccess.");
+          "window() is only available in the context of a OldDoFn marked as RequiresWindowAccess.");
     }
     return Iterables.getOnlyElement(windowedValue.getWindows());
   }
@@ -200,7 +200,7 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
   protected abstract Iterator<ValueT> getOutputIterator();
 
   protected Iterable<ValueT> getOutputIterable(final Iterator<WindowedValue<InputT>> iter,
-                                               final DoFn<InputT, OutputT> doFn) {
+                                               final OldDoFn<InputT, OutputT> doFn) {
     return new Iterable<ValueT>() {
       @Override
       public Iterator<ValueT> iterator() {
@@ -212,11 +212,11 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT>
   private class ProcCtxtIterator extends AbstractIterator<ValueT> {
 
     private final Iterator<WindowedValue<InputT>> inputIterator;
-    private final DoFn<InputT, OutputT> doFn;
+    private final OldDoFn<InputT, OutputT> doFn;
     private Iterator<ValueT> outputIterator;
     private boolean calledFinish;
 
-    ProcCtxtIterator(Iterator<WindowedValue<InputT>> iterator, DoFn<InputT, OutputT> doFn) {
+    ProcCtxtIterator(Iterator<WindowedValue<InputT>> iterator, OldDoFn<InputT, OutputT> doFn) {
       this.inputIterator = iterator;
       this.doFn = doFn;
       this.outputIterator = getOutputIterator();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index c5d5802..c51a500 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -39,8 +39,8 @@ import org.apache.beam.sdk.io.AvroIO;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
@@ -94,6 +94,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+
 import scala.Tuple2;
 
 /**
@@ -203,7 +204,7 @@ public final class TransformTranslator {
         WindowingStrategy<?, W> windowingStrategy =
             (WindowingStrategy<?, W>) transform.getWindowingStrategy();
 
-        DoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<V>>> gabwDoFn =
+        OldDoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<V>>> gabwDoFn =
             new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
                 windowingStrategy,
                 new InMemoryStateInternalsFactory<K>(),
@@ -768,7 +769,7 @@ public final class TransformTranslator {
                 && windowFn instanceof GlobalWindows)) {
           context.setOutputRDD(transform, inRDD);
         } else {
-          DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
+          OldDoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
           DoFnFunction<T, T> dofn =
               new DoFnFunction<>(addWindowsDoFn, context.getRuntimeContext(), null);
           context.setOutputRDD(transform, inRDD.mapPartitions(dofn));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 8154cd7..b0fb931 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -32,8 +32,8 @@ import org.apache.beam.sdk.io.AvroIO;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -315,7 +315,7 @@ public final class StreamingTransformTranslator {
           sec.setStream(transform, dStream.window(windowDuration, slideDuration));
         }
         //--- then we apply windowing to the elements
-        DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
+        OldDoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
         DoFnFunction<T, T> dofn = new DoFnFunction<>(addWindowsDoFn,
             ((StreamingEvaluationContext) context).getRuntimeContext(), null);
         @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
index d1f8d12..e4a293f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
@@ -24,8 +24,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Keys;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.RemoveDuplicates;
@@ -101,7 +101,7 @@ public class TfIdfTest {
       // of the words in the document associated with that that URI.
       PCollection<KV<URI, String>> uriToWords = uriToContent
           .apply("SplitWords", ParDo.of(
-              new DoFn<KV<URI, String>, KV<URI, String>>() {
+              new OldDoFn<KV<URI, String>, KV<URI, String>>() {
                 @Override
                 public void processElement(ProcessContext c) {
                   URI uri = c.element().getKey();
@@ -144,7 +144,7 @@ public class TfIdfTest {
       // by the URI key.
       PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
           .apply("ShiftKeys", ParDo.of(
-              new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
+              new OldDoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
                 @Override
                 public void processElement(ProcessContext c) {
                   URI uri = c.element().getKey().getKey();
@@ -183,7 +183,7 @@ public class TfIdfTest {
       // divided by the total number of words in the document.
       PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
           .apply("ComputeTermFrequencies", ParDo.of(
-              new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+              new OldDoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
                 @Override
                 public void processElement(ProcessContext c) {
                   URI uri = c.element().getKey();
@@ -208,7 +208,7 @@ public class TfIdfTest {
       PCollection<KV<String, Double>> wordToDf = wordToDocCount
           .apply("ComputeDocFrequencies", ParDo
               .withSideInputs(totalDocuments)
-              .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
+              .of(new OldDoFn<KV<String, Long>, KV<String, Double>>() {
                 @Override
                 public void processElement(ProcessContext c) {
                   String word = c.element().getKey();
@@ -237,7 +237,7 @@ public class TfIdfTest {
       // divided by the log of the document frequency.
       return wordToUriAndTfAndDf
           .apply("ComputeTfIdf", ParDo.of(
-              new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+              new OldDoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
                 @Override
                 public void processElement(ProcessContext c) {
                   String word = c.element().getKey();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
index 600217d..2e477e9 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
@@ -70,7 +70,7 @@ public class CombinePerKeyTest {
     private static class SumPerKey<T> extends PTransform<PCollection<T>, PCollection<KV<T, Long>>> {
       @Override
       public PCollection<KV<T, Long>> apply(PCollection<T> pcol) {
-          PCollection<KV<T, Long>> withLongs = pcol.apply(ParDo.of(new DoFn<T, KV<T, Long>>() {
+          PCollection<KV<T, Long>> withLongs = pcol.apply(ParDo.of(new OldDoFn<T, KV<T, Long>>() {
               @Override
               public void processElement(ProcessContext processContext) throws Exception {
                   processContext.output(KV.of(processContext.element(), 1L));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
index 0f60271..263ce99 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
@@ -25,7 +25,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -46,7 +46,7 @@ public class DoFnOutputTest implements Serializable {
     PCollection<String> strings = pipeline.apply(Create.of("a"));
     // Test that values written from startBundle() and finishBundle() are written to
     // the output
-    PCollection<String> output = strings.apply(ParDo.of(new DoFn<String, String>() {
+    PCollection<String> output = strings.apply(ParDo.of(new OldDoFn<String, String>() {
       @Override
       public void startBundle(Context c) throws Exception {
         c.output("start");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
index ded3eb2..739eec3 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
@@ -30,9 +30,9 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.ApproximateUnique;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
@@ -103,9 +103,9 @@ public class MultiOutputWordCountTest {
   }
 
   /**
-   * A DoFn that tokenizes lines of text into individual words.
+   * A OldDoFn that tokenizes lines of text into individual words.
    */
-  static class ExtractWordsFn extends DoFn<String, String> {
+  static class ExtractWordsFn extends OldDoFn<String, String> {
 
     private final Aggregator<Integer, Integer> totalWords = createAggregator("totalWords",
         new Sum.SumIntegerFn());
@@ -170,7 +170,7 @@ public class MultiOutputWordCountTest {
     }
   }
 
-  private static class FormatCountsFn extends DoFn<KV<String, Long>, String> {
+  private static class FormatCountsFn extends OldDoFn<KV<String, Long>, String> {
     @Override
     public void processElement(ProcessContext c) {
       c.output(c.element().getKey() + ": " + c.element().getValue());