You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/06 02:52:23 UTC

[05/51] [abbrv] incubator-beam git commit: Rename DoFn to OldDoFn

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
index 5b9eeff..5e96c46 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
@@ -145,9 +145,9 @@ public class SerializationTest {
   }
 
   /**
-   * A DoFn that tokenizes lines of text into individual words.
+   * A OldDoFn that tokenizes lines of text into individual words.
    */
-  static class ExtractWordsFn extends DoFn<StringHolder, StringHolder> {
+  static class ExtractWordsFn extends OldDoFn<StringHolder, StringHolder> {
     private static final Pattern WORD_BOUNDARY = Pattern.compile("[^a-zA-Z']+");
     private final Aggregator<Long, Long> emptyLines =
         createAggregator("emptyLines", new Sum.SumLongFn());
@@ -173,9 +173,9 @@ public class SerializationTest {
   }
 
   /**
-   * A DoFn that converts a Word and Count into a printable string.
+   * A OldDoFn that converts a Word and Count into a printable string.
    */
-  private static class FormatCountsFn extends DoFn<KV<StringHolder, Long>, StringHolder> {
+  private static class FormatCountsFn extends OldDoFn<KV<StringHolder, Long>, StringHolder> {
     @Override
     public void processElement(ProcessContext c) {
       c.output(new StringHolder(c.element().getKey() + ": " + c.element().getValue()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
index 60b7f71..5775565 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringDelegateCoder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 
 import org.junit.After;
@@ -54,7 +54,7 @@ public class SideEffectsTest implements Serializable {
 
     pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
 
-    pipeline.apply(Create.of("a")).apply(ParDo.of(new DoFn<String, String>() {
+    pipeline.apply(Create.of("a")).apply(ParDo.of(new OldDoFn<String, String>() {
       @Override
       public void processElement(ProcessContext c) throws Exception {
         throw new UserException();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index 904b448..c005f14 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -122,7 +122,7 @@ public class KafkaStreamingTest {
     EMBEDDED_ZOOKEEPER.shutdown();
   }
 
-  private static class FormatKVFn extends DoFn<KV<String, String>, String> {
+  private static class FormatKVFn extends OldDoFn<KV<String, String>, String> {
     @Override
     public void processElement(ProcessContext c) {
       c.output(c.element().getKey() + "," + c.element().getValue());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 873a591..da4db93 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
index 9db6650..c34ce66 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.coders;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
-
 import org.joda.time.Duration;
 import org.joda.time.ReadableDuration;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
index 693791c..d41bd1f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
@@ -22,7 +22,6 @@ import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import com.google.common.base.Converter;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
-
 import org.joda.time.Instant;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index ecb1f0a..182fa1f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -25,7 +25,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -709,7 +709,7 @@ public class PubsubIO {
        *
        * <p>Public so can be suppressed by runners.
        */
-      public class PubsubBoundedReader extends DoFn<Void, T> {
+      public class PubsubBoundedReader extends OldDoFn<Void, T> {
         private static final int DEFAULT_PULL_SIZE = 100;
         private static final int ACK_TIMEOUT_SEC = 60;
 
@@ -998,7 +998,7 @@ public class PubsubIO {
        *
        * <p>Public so can be suppressed by runners.
        */
-      public class PubsubBoundedWriter extends DoFn<T, Void> {
+      public class PubsubBoundedWriter extends OldDoFn<T, Void> {
         private static final int MAX_PUBLISH_BATCH_SIZE = 100;
         private transient List<OutgoingMessage> output;
         private transient PubsubClient pubsubClient;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index 6f2b3ac..9e9536d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -31,8 +31,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
@@ -78,7 +78,7 @@ import javax.annotation.Nullable;
  * <li>We try to send messages in batches while also limiting send latency.
  * <li>No stats are logged. Rather some counters are used to keep track of elements and batches.
  * <li>Though some background threads are used by the underlying netty system all actual Pubsub
- * calls are blocking. We rely on the underlying runner to allow multiple {@link DoFn} instances
+ * calls are blocking. We rely on the underlying runner to allow multiple {@link OldDoFn} instances
  * to execute concurrently and hide latency.
  * <li>A failed bundle will cause messages to be resent. Thus we rely on the Pubsub consumer
  * to dedup messages.
@@ -155,7 +155,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
   /**
    * Convert elements to messages and shard them.
    */
-  private static class ShardFn<T> extends DoFn<T, KV<Integer, OutgoingMessage>> {
+  private static class ShardFn<T> extends OldDoFn<T, KV<Integer, OutgoingMessage>> {
     private final Aggregator<Long, Long> elementCounter =
         createAggregator("elements", new Sum.SumLongFn());
     private final Coder<T> elementCoder;
@@ -207,7 +207,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
    * Publish messages to Pubsub in batches.
    */
   private static class WriterFn
-      extends DoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> {
+      extends OldDoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> {
     private final PubsubClientFactory pubsubFactory;
     private final TopicPath topic;
     private final String timestampLabel;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index 07d355e..d98bd6a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -31,7 +31,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
@@ -1107,7 +1107,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
   // StatsFn
   // ================================================================================
 
-  private static class StatsFn<T> extends DoFn<T, T> {
+  private static class StatsFn<T> extends OldDoFn<T, T> {
     private final Aggregator<Long, Long> elementCounter =
         createAggregator("elements", new Sum.SumLongFn());
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
index b8902f9..de00035 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
@@ -19,9 +19,9 @@ package org.apache.beam.sdk.io;
 
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
-
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
+
 import org.joda.time.Instant;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index 42d3c05..3e997b0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -27,8 +27,8 @@ import org.apache.beam.sdk.io.Sink.WriteOperation;
 import org.apache.beam.sdk.io.Sink.Writer;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -156,7 +156,7 @@ public class Write {
      * Writes all the elements in a bundle using a {@link Writer} produced by the
      * {@link WriteOperation} associated with the {@link Sink}.
      */
-    private class WriteBundles<WriteT> extends DoFn<T, WriteT> {
+    private class WriteBundles<WriteT> extends OldDoFn<T, WriteT> {
       // Writer that will write the records in this bundle. Lazily
       // initialized in processElement.
       private Writer<T, WriteT> writer = null;
@@ -182,7 +182,7 @@ public class Write {
           // Discard write result and close the write.
           try {
             writer.close();
-            // The writer does not need to be reset, as this DoFn cannot be reused.
+            // The writer does not need to be reset, as this OldDoFn cannot be reused.
           } catch (Exception closeException) {
             if (closeException instanceof InterruptedException) {
               // Do not silently ignore interrupted state.
@@ -217,7 +217,7 @@ public class Write {
      *
      * @see WriteBundles
      */
-    private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> {
+    private class WriteShardedBundles<WriteT> extends OldDoFn<KV<Integer, Iterable<T>>, WriteT> {
       private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
 
       WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
@@ -296,10 +296,11 @@ public class Write {
      * <p>This singleton collection containing the WriteOperation is then used as a side input to a
      * ParDo over the PCollection of elements to write. In this bundle-writing phase,
      * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
-     * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and
-     * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for every
-     * element in the bundle. The output of this ParDo is a PCollection of <i>writer result</i>
-     * objects (see {@link Sink} for a description of writer results)-one for each bundle.
+     * {@link Writer#open} and {@link Writer#close} are called in {@link OldDoFn#startBundle} and
+     * {@link OldDoFn#finishBundle}, respectively, and {@link Writer#write} method is called for
+     * every element in the bundle. The output of this ParDo is a PCollection of
+     * <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for
+     * each bundle.
      *
      * <p>The final do-once ParDo uses the singleton collection of the WriteOperation as input and
      * the collection of writer results as a side-input. In this ParDo,
@@ -333,7 +334,7 @@ public class Write {
       // Initialize the resource in a do-once ParDo on the WriteOperation.
       operationCollection = operationCollection
           .apply("Initialize", ParDo.of(
-              new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
+              new OldDoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
             @Override
             public void processElement(ProcessContext c) throws Exception {
               WriteOperation<T, WriteT> writeOperation = c.element();
@@ -387,7 +388,7 @@ public class Write {
       // ParDo. There is a dependency between this ParDo and the parallel write (the writer results
       // collection as a side input), so it will happen after the parallel write.
       operationCollection
-          .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
+          .apply("Finalize", ParDo.of(new OldDoFn<WriteOperation<T, WriteT>, Integer>() {
             @Override
             public void processElement(ProcessContext c) throws Exception {
               WriteOperation<T, WriteT> writeOperation = c.element();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
index e0a1ef3..b2df96e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
@@ -29,7 +29,6 @@ import com.google.common.base.Strings;
 import com.google.common.io.Files;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index e89e5ad..aa9f13e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -22,8 +22,8 @@ import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer;
 import org.apache.beam.sdk.options.ProxyInvocationHandler.Deserializer;
 import org.apache.beam.sdk.options.ProxyInvocationHandler.Serializer;
 import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Context;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.Context;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 
 import com.google.auto.service.AutoService;
@@ -52,7 +52,7 @@ import javax.annotation.concurrent.ThreadSafe;
  * and {@link PipelineOptionsFactory#as(Class)}. They can be created
  * from command-line arguments with {@link PipelineOptionsFactory#fromArgs(String[])}.
  * They can be converted to another type by invoking {@link PipelineOptions#as(Class)} and
- * can be accessed from within a {@link DoFn} by invoking
+ * can be accessed from within a {@link OldDoFn} by invoking
  * {@link Context#getPipelineOptions()}.
  *
  * <p>For example:
@@ -151,7 +151,7 @@ import javax.annotation.concurrent.ThreadSafe;
  * {@link PipelineOptionsFactory#withValidation()} is invoked.
  *
  * <p>{@link JsonIgnore @JsonIgnore} is used to prevent a property from being serialized and
- * available during execution of {@link DoFn}. See the Serialization section below for more
+ * available during execution of {@link OldDoFn}. See the Serialization section below for more
  * details.
  *
  * <h2>Registration Of PipelineOptions</h2>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
index f21b9b9..67fa2af 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
@@ -53,7 +53,6 @@ import com.google.common.collect.TreeMultimap;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsReflector.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsReflector.java
index 815de82..607bdda 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsReflector.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsReflector.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.options;
 
 import org.apache.beam.sdk.util.common.ReflectHelpers;
+
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Multimap;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorValues.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorValues.java
index a42ece2..6f6836e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorValues.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorValues.java
@@ -19,14 +19,14 @@ package org.apache.beam.sdk.runners;
 
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 
 import java.util.Collection;
 import java.util.Map;
 
 /**
  * A collection of values associated with an {@link Aggregator}. Aggregators declared in a
- * {@link DoFn} are emitted on a per-{@code DoFn}-application basis.
+ * {@link OldDoFn} are emitted on a per-{@code OldDoFn}-application basis.
  *
  * @param <T> the output type of the aggregator
  */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index a202ed4..80340c2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -33,11 +33,11 @@ import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -762,7 +762,7 @@ public class PAssert {
           .apply("RewindowActuals", rewindowActuals.<T>windowActuals())
           .apply(
               ParDo.of(
-                  new DoFn<T, T>() {
+                  new OldDoFn<T, T>() {
                     @Override
                     public void processElement(ProcessContext context) throws CoderException {
                       context.output(CoderUtils.clone(coder, context.element()));
@@ -884,7 +884,7 @@ public class PAssert {
     }
   }
 
-  private static final class ConcatFn<T> extends DoFn<Iterable<Iterable<T>>, Iterable<T>> {
+  private static final class ConcatFn<T> extends OldDoFn<Iterable<Iterable<T>>, Iterable<T>> {
     @Override
     public void processElement(ProcessContext c) throws Exception {
       c.output(Iterables.concat(c.element()));
@@ -995,13 +995,13 @@ public class PAssert {
   }
 
   /**
-   * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of a
+   * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of a
    * {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing.
    *
    * <p>The input is ignored, but is {@link Integer} to be usable on runners that do not support
    * null values.
    */
-  private static class SideInputCheckerDoFn<ActualT> extends DoFn<Integer, Void> {
+  private static class SideInputCheckerDoFn<ActualT> extends OldDoFn<Integer, Void> {
     private final SerializableFunction<ActualT, Void> checkerFn;
     private final Aggregator<Integer, Integer> success =
         createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
@@ -1030,13 +1030,13 @@ public class PAssert {
   }
 
   /**
-   * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
+   * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of
    * the single iterable element of the input {@link PCollection} and adjusts counters and
    * thrown exceptions for use in testing.
    *
    * <p>The singleton property is presumed, not enforced.
    */
-  private static class GroupedValuesCheckerDoFn<ActualT> extends DoFn<ActualT, Void> {
+  private static class GroupedValuesCheckerDoFn<ActualT> extends OldDoFn<ActualT, Void> {
     private final SerializableFunction<ActualT, Void> checkerFn;
     private final Aggregator<Integer, Integer> success =
         createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
@@ -1061,14 +1061,14 @@ public class PAssert {
   }
 
   /**
-   * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
+   * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of
    * the single item contained within the single iterable on input and
    * adjusts counters and thrown exceptions for use in testing.
    *
    * <p>The singleton property of the input {@link PCollection} is presumed, not enforced. However,
    * each input element must be a singleton iterable, or this will fail.
    */
-  private static class SingletonCheckerDoFn<ActualT> extends DoFn<Iterable<ActualT>, Void> {
+  private static class SingletonCheckerDoFn<ActualT> extends OldDoFn<Iterable<ActualT>, Void> {
     private final SerializableFunction<ActualT, Void> checkerFn;
     private final Aggregator<Integer, Integer> success =
         createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
@@ -1310,7 +1310,7 @@ public class PAssert {
   }
 
   /**
-   * A DoFn that filters elements based on their presence in a static collection of windows.
+   * A OldDoFn that filters elements based on their presence in a static collection of windows.
    */
   private static final class FilterWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
     private final StaticWindows windows;
@@ -1324,7 +1324,7 @@ public class PAssert {
       return input.apply("FilterWindows", ParDo.of(new Fn()));
     }
 
-    private class Fn extends DoFn<T, T> implements RequiresWindowAccess {
+    private class Fn extends OldDoFn<T, T> implements RequiresWindowAccess {
       @Override
       public void processElement(ProcessContext c) throws Exception {
         if (windows.getWindows().contains(c.window())) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java
index 45b0592..4e0c0be 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java
@@ -35,7 +35,6 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
-
 import javax.annotation.Nullable;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 0de3024..98cdeba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -37,7 +37,6 @@ import com.fasterxml.jackson.core.TreeNode;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-
 import org.junit.experimental.categories.Category;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
index ff553ba..c4596c1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
@@ -21,6 +21,7 @@ import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.PipelineOptions;
+
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
index c8aad78..db4ab33 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
@@ -24,8 +24,9 @@ import org.apache.beam.sdk.util.ExecutionContext;
  * An {@code Aggregator<InputT>} enables monitoring of values of type {@code InputT},
  * to be combined across all bundles.
  *
- * <p>Aggregators are created by calling {@link DoFn#createAggregator DoFn.createAggregatorForDoFn},
- * typically from the {@link DoFn} constructor. Elements can be added to the
+ * <p>Aggregators are created by calling
+ * {@link OldDoFn#createAggregator OldDoFn.createAggregatorForDoFn},
+ * typically from the {@link OldDoFn} constructor. Elements can be added to the
  * {@code Aggregator} by calling {@link Aggregator#addValue}.
  *
  * <p>Aggregators are visible in the monitoring UI, when the pipeline is run
@@ -36,7 +37,7 @@ import org.apache.beam.sdk.util.ExecutionContext;
  *
  * <p>Example:
  * <pre> {@code
- * class MyDoFn extends DoFn<String, String> {
+ * class MyDoFn extends OldDoFn<String, String> {
  *   private Aggregator<Integer, Integer> myAggregator;
  *
  *   public MyDoFn() {
@@ -78,8 +79,9 @@ public interface Aggregator<InputT, OutputT> {
     /**
      * Create an aggregator with the given {@code name} and {@link CombineFn}.
      *
-     *  <p>This method is called to create an aggregator for a {@link DoFn}. It receives the class
-     *  of the {@link DoFn} being executed and the context of the step it is being executed in.
+     *  <p>This method is called to create an aggregator for a {@link OldDoFn}. It receives the
+     *  class of the {@link OldDoFn} being executed and the context of the step it is being
+     *  executed in.
      */
     <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
         Class<?> fnClass, ExecutionContext.StepContext stepContext,
@@ -88,7 +90,7 @@ public interface Aggregator<InputT, OutputT> {
 
   // TODO: Consider the following additional API conveniences:
   // - In addition to createAggregatorForDoFn(), consider adding getAggregator() to
-  //   avoid the need to store the aggregator locally in a DoFn, i.e., create
+  //   avoid the need to store the aggregator locally in a OldDoFn, i.e., create
   //   if not already present.
   // - Add a shortcut for the most common aggregator:
   //   c.createAggregatorForDoFn("name", new Sum.SumIntegerFn()).

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
index 97961e9..abed843 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.transforms;
 import java.util.Collection;
 
 /**
- * An internal class for extracting {@link Aggregator Aggregators} from {@link DoFn DoFns}.
+ * An internal class for extracting {@link Aggregator Aggregators} from {@link OldDoFn DoFns}.
  */
 public final class AggregatorRetriever {
   private AggregatorRetriever() {
@@ -28,9 +28,9 @@ public final class AggregatorRetriever {
   }
 
   /**
-   * Returns the {@link Aggregator Aggregators} created by the provided {@link DoFn}.
+   * Returns the {@link Aggregator Aggregators} created by the provided {@link OldDoFn}.
    */
-  public static Collection<Aggregator<?, ?>> getAggregators(DoFn<?, ?> fn) {
+  public static Collection<Aggregator<?, ?>> getAggregators(OldDoFn<?, ?> fn) {
     return fn.getAggregators();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 96c03eb..6fc2324 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -1473,9 +1473,9 @@ public class Combine {
       PCollection<OutputT> defaultIfEmpty = maybeEmpty.getPipeline()
           .apply("CreateVoid", Create.of((Void) null).withCoder(VoidCoder.of()))
           .apply("ProduceDefault", ParDo.withSideInputs(maybeEmptyView).of(
-              new DoFn<Void, OutputT>() {
+              new OldDoFn<Void, OutputT>() {
                 @Override
-                public void processElement(DoFn<Void, OutputT>.ProcessContext c) {
+                public void processElement(OldDoFn<Void, OutputT>.ProcessContext c) {
                   Iterator<OutputT> combined = c.sideInput(maybeEmptyView).iterator();
                   if (!combined.hasNext()) {
                     c.output(defaultValue);
@@ -2097,7 +2097,7 @@ public class Combine {
       final TupleTag<KV<KV<K, Integer>, InputT>> hot = new TupleTag<>();
       final TupleTag<KV<K, InputT>> cold = new TupleTag<>();
       PCollectionTuple split = input.apply("AddNonce", ParDo.of(
-          new DoFn<KV<K, InputT>, KV<K, InputT>>() {
+          new OldDoFn<KV<K, InputT>, KV<K, InputT>>() {
             transient int counter;
             @Override
             public void startBundle(Context c) {
@@ -2135,8 +2135,8 @@ public class Combine {
           .setWindowingStrategyInternal(preCombineStrategy)
           .apply("PreCombineHot", Combine.perKey(hotPreCombine))
           .apply("StripNonce", ParDo.of(
-              new DoFn<KV<KV<K, Integer>, AccumT>,
-                       KV<K, InputOrAccum<InputT, AccumT>>>() {
+              new OldDoFn<KV<KV<K, Integer>, AccumT>,
+                                     KV<K, InputOrAccum<InputT, AccumT>>>() {
                 @Override
                 public void processElement(ProcessContext c) {
                   c.output(KV.of(
@@ -2151,7 +2151,7 @@ public class Combine {
           .get(cold)
           .setCoder(inputCoder)
           .apply("PrepareCold", ParDo.of(
-              new DoFn<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>() {
+              new OldDoFn<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>() {
                 @Override
                 public void processElement(ProcessContext c) {
                   c.output(KV.of(c.element().getKey(),
@@ -2359,7 +2359,7 @@ public class Combine {
       final PerKeyCombineFnRunner<? super K, ? super InputT, ?, OutputT> combineFnRunner =
           PerKeyCombineFnRunners.create(fn);
       PCollection<KV<K, OutputT>> output = input.apply(ParDo.of(
-          new DoFn<KV<K, ? extends Iterable<InputT>>, KV<K, OutputT>>() {
+          new OldDoFn<KV<K, ? extends Iterable<InputT>>, KV<K, OutputT>>() {
             @Override
             public void processElement(ProcessContext c) {
               K key = c.element().getKey();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index f2ed5e1..777deba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -90,7 +90,7 @@ public class CombineFns {
    *
    * PCollection<T> finalResultCollection = maxAndMean
    *     .apply(ParDo.of(
-   *         new DoFn<KV<K, CoCombineResult>, T>() {
+   *         new OldDoFn<KV<K, CoCombineResult>, T>() {
    *           @Override
    *           public void processElement(ProcessContext c) throws Exception {
    *             KV<K, CoCombineResult> e = c.element();
@@ -133,7 +133,7 @@ public class CombineFns {
    *
    * PCollection<T> finalResultCollection = maxAndMean
    *     .apply(ParDo.of(
-   *         new DoFn<CoCombineResult, T>() {
+   *         new OldDoFn<CoCombineResult, T>() {
    *           @Override
    *           public void processElement(ProcessContext c) throws Exception {
    *             CoCombineResult e = c.element();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index 3a0fb5d..7601ffc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -107,7 +107,7 @@ public class Count {
     public PCollection<KV<T, Long>> apply(PCollection<T> input) {
       return
           input
-          .apply("Init", ParDo.of(new DoFn<T, KV<T, Void>>() {
+          .apply("Init", ParDo.of(new OldDoFn<T, KV<T, Void>>() {
             @Override
             public void processElement(ProcessContext c) {
               c.output(KV.of(c.element(), (Void) null));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index fa645ab..fb7f784 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -486,7 +486,7 @@ public class Create<T> {
       this.elementCoder = elementCoder;
     }
 
-    private static class ConvertTimestamps<T> extends DoFn<TimestampedValue<T>, T> {
+    private static class ConvertTimestamps<T> extends OldDoFn<TimestampedValue<T>, T> {
       @Override
       public void processElement(ProcessContext c) {
         c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
deleted file mode 100644
index 6d5d1ed..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ /dev/null
@@ -1,565 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowingInternals;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-import com.google.common.base.MoreObjects;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.UUID;
-
-/**
- * The argument to {@link ParDo} providing the code to use to process
- * elements of the input
- * {@link org.apache.beam.sdk.values.PCollection}.
- *
- * <p>See {@link ParDo} for more explanation, examples of use, and
- * discussion of constraints on {@code DoFn}s, including their
- * serializability, lack of access to global shared mutable state,
- * requirements for failure tolerance, and benefits of optimization.
- *
- * <p>{@code DoFn}s can be tested in the context of a particular
- * {@code Pipeline} by running that {@code Pipeline} on sample input
- * and then checking its output.  Unit testing of a {@code DoFn},
- * separately from any {@code ParDo} transform or {@code Pipeline},
- * can be done via the {@link DoFnTester} harness.
- *
- * <p>{@link DoFnWithContext} (currently experimental) offers an alternative
- * mechanism for accessing {@link ProcessContext#window()} without the need
- * to implement {@link RequiresWindowAccess}.
- *
- * <p>See also {@link #processElement} for details on implementing the transformation
- * from {@code InputT} to {@code OutputT}.
- *
- * @param <InputT> the type of the (main) input elements
- * @param <OutputT> the type of the (main) output elements
- */
-public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayData {
-
-  /**
-   * Information accessible to all methods in this {@code DoFn}.
-   * Used primarily to output elements.
-   */
-  public abstract class Context {
-
-    /**
-     * Returns the {@code PipelineOptions} specified with the
-     * {@link org.apache.beam.sdk.runners.PipelineRunner}
-     * invoking this {@code DoFn}.  The {@code PipelineOptions} will
-     * be the default running via {@link DoFnTester}.
-     */
-    public abstract PipelineOptions getPipelineOptions();
-
-    /**
-     * Adds the given element to the main output {@code PCollection}.
-     *
-     * <p>Once passed to {@code output} the element should be considered
-     * immutable and not be modified in any way. It may be cached or retained
-     * by the Dataflow runtime or later steps in the pipeline, or used in
-     * other unspecified ways.
-     *
-     * <p>If invoked from {@link DoFn#processElement processElement}, the output
-     * element will have the same timestamp and be in the same windows
-     * as the input element passed to {@link DoFn#processElement processElement}.
-     *
-     * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
-     * this will attempt to use the
-     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
-     * of the input {@code PCollection} to determine what windows the element
-     * should be in, throwing an exception if the {@code WindowFn} attempts
-     * to access any information about the input element. The output element
-     * will have a timestamp of negative infinity.
-     */
-    public abstract void output(OutputT output);
-
-    /**
-     * Adds the given element to the main output {@code PCollection},
-     * with the given timestamp.
-     *
-     * <p>Once passed to {@code outputWithTimestamp} the element should not be
-     * modified in any way.
-     *
-     * <p>If invoked from {@link DoFn#processElement processElement}, the timestamp
-     * must not be older than the input element's timestamp minus
-     * {@link DoFn#getAllowedTimestampSkew getAllowedTimestampSkew}.  The output element will
-     * be in the same windows as the input element.
-     *
-     * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
-     * this will attempt to use the
-     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
-     * of the input {@code PCollection} to determine what windows the element
-     * should be in, throwing an exception if the {@code WindowFn} attempts
-     * to access any information about the input element except for the
-     * timestamp.
-     */
-    public abstract void outputWithTimestamp(OutputT output, Instant timestamp);
-
-    /**
-     * Adds the given element to the side output {@code PCollection} with the
-     * given tag.
-     *
-     * <p>Once passed to {@code sideOutput} the element should not be modified
-     * in any way.
-     *
-     * <p>The caller of {@code ParDo} uses {@link ParDo#withOutputTags withOutputTags} to
-     * specify the tags of side outputs that it consumes. Non-consumed side
-     * outputs, e.g., outputs for monitoring purposes only, don't necessarily
-     * need to be specified.
-     *
-     * <p>The output element will have the same timestamp and be in the same
-     * windows as the input element passed to {@link DoFn#processElement processElement}.
-     *
-     * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
-     * this will attempt to use the
-     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
-     * of the input {@code PCollection} to determine what windows the element
-     * should be in, throwing an exception if the {@code WindowFn} attempts
-     * to access any information about the input element. The output element
-     * will have a timestamp of negative infinity.
-     *
-     * @see ParDo#withOutputTags
-     */
-    public abstract <T> void sideOutput(TupleTag<T> tag, T output);
-
-    /**
-     * Adds the given element to the specified side output {@code PCollection},
-     * with the given timestamp.
-     *
-     * <p>Once passed to {@code sideOutputWithTimestamp} the element should not be
-     * modified in any way.
-     *
-     * <p>If invoked from {@link DoFn#processElement processElement}, the timestamp
-     * must not be older than the input element's timestamp minus
-     * {@link DoFn#getAllowedTimestampSkew getAllowedTimestampSkew}.  The output element will
-     * be in the same windows as the input element.
-     *
-     * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
-     * this will attempt to use the
-     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
-     * of the input {@code PCollection} to determine what windows the element
-     * should be in, throwing an exception if the {@code WindowFn} attempts
-     * to access any information about the input element except for the
-     * timestamp.
-     *
-     * @see ParDo#withOutputTags
-     */
-    public abstract <T> void sideOutputWithTimestamp(
-        TupleTag<T> tag, T output, Instant timestamp);
-
-    /**
-     * Creates an {@link Aggregator} in the {@link DoFn} context with the
-     * specified name and aggregation logic specified by {@link CombineFn}.
-     *
-     * <p>For internal use only.
-     *
-     * @param name the name of the aggregator
-     * @param combiner the {@link CombineFn} to use in the aggregator
-     * @return an aggregator for the provided name and {@link CombineFn} in this
-     *         context
-     */
-    @Experimental(Kind.AGGREGATOR)
-    protected abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
-        createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
-
-    /**
-     * Sets up {@link Aggregator}s created by the {@link DoFn} so they are
-     * usable within this context.
-     *
-     * <p>This method should be called by runners before {@link DoFn#startBundle}
-     * is executed.
-     */
-    @Experimental(Kind.AGGREGATOR)
-    protected final void setupDelegateAggregators() {
-      for (DelegatingAggregator<?, ?> aggregator : aggregators.values()) {
-        setupDelegateAggregator(aggregator);
-      }
-
-      aggregatorsAreFinal = true;
-    }
-
-    private final <AggInputT, AggOutputT> void setupDelegateAggregator(
-        DelegatingAggregator<AggInputT, AggOutputT> aggregator) {
-
-      Aggregator<AggInputT, AggOutputT> delegate = createAggregatorInternal(
-          aggregator.getName(), aggregator.getCombineFn());
-
-      aggregator.setDelegate(delegate);
-    }
-  }
-
-  /**
-   * Information accessible when running {@link DoFn#processElement}.
-   */
-  public abstract class ProcessContext extends Context {
-
-    /**
-     * Returns the input element to be processed.
-     *
-     * <p>The element should be considered immutable. The Dataflow runtime will not mutate the
-     * element, so it is safe to cache, etc. The element should not be mutated by any of the
-     * {@link DoFn} methods, because it may be cached elsewhere, retained by the Dataflow runtime,
-     * or used in other unspecified ways.
-     */
-    public abstract InputT element();
-
-    /**
-     * Returns the value of the side input for the window corresponding to the
-     * window of the main input element.
-     *
-     * <p>See
-     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn#getSideInputWindow}
-     * for how this corresponding window is determined.
-     *
-     * @throws IllegalArgumentException if this is not a side input
-     * @see ParDo#withSideInputs
-     */
-    public abstract <T> T sideInput(PCollectionView<T> view);
-
-    /**
-     * Returns the timestamp of the input element.
-     *
-     * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
-     * for more information.
-     */
-    public abstract Instant timestamp();
-
-    /**
-     * Returns the window into which the input element has been assigned.
-     *
-     * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
-     * for more information.
-     *
-     * @throws UnsupportedOperationException if this {@link DoFn} does
-     * not implement {@link RequiresWindowAccess}.
-     */
-    public abstract BoundedWindow window();
-
-    /**
-     * Returns information about the pane within this window into which the
-     * input element has been assigned.
-     *
-     * <p>Generally all data is in a single, uninteresting pane unless custom
-     * triggering and/or late data has been explicitly requested.
-     * See {@link org.apache.beam.sdk.transforms.windowing.Window}
-     * for more information.
-     */
-    public abstract PaneInfo pane();
-
-    /**
-     * Returns the process context to use for implementing windowing.
-     */
-    @Experimental
-    public abstract WindowingInternals<InputT, OutputT> windowingInternals();
-  }
-
-  /**
-   * Returns the allowed timestamp skew duration, which is the maximum
-   * duration that timestamps can be shifted backward in
-   * {@link DoFn.Context#outputWithTimestamp}.
-   *
-   * <p>The default value is {@code Duration.ZERO}, in which case
-   * timestamps can only be shifted forward to future.  For infinite
-   * skew, return {@code Duration.millis(Long.MAX_VALUE)}.
-   *
-   * <p> Note that producing an element whose timestamp is less than the
-   * current timestamp may result in late data, i.e. returning a non-zero
-   * value here does not impact watermark calculations used for firing
-   * windows.
-   *
-   * @deprecated does not interact well with the watermark.
-   */
-  @Deprecated
-  public Duration getAllowedTimestampSkew() {
-    return Duration.ZERO;
-  }
-
-  /**
-   * Interface for signaling that a {@link DoFn} needs to access the window the
-   * element is being processed in, via {@link DoFn.ProcessContext#window}.
-   */
-  @Experimental
-  public interface RequiresWindowAccess {}
-
-  public DoFn() {
-    this(new HashMap<String, DelegatingAggregator<?, ?>>());
-  }
-
-  DoFn(Map<String, DelegatingAggregator<?, ?>> aggregators) {
-    this.aggregators = aggregators;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private final Map<String, DelegatingAggregator<?, ?>> aggregators;
-
-  /**
-   * Protects aggregators from being created after initialization.
-   */
-  private boolean aggregatorsAreFinal;
-
-  /**
-   * Prepares this {@code DoFn} instance for processing a batch of elements.
-   *
-   * <p>By default, does nothing.
-   */
-  public void startBundle(Context c) throws Exception {
-  }
-
-  /**
-   * Processes one input element.
-   *
-   * <p>The current element of the input {@code PCollection} is returned by
-   * {@link ProcessContext#element() c.element()}. It should be considered immutable. The Dataflow
-   * runtime will not mutate the element, so it is safe to cache, etc. The element should not be
-   * mutated by any of the {@link DoFn} methods, because it may be cached elsewhere, retained by the
-   * Dataflow runtime, or used in other unspecified ways.
-   *
-   * <p>A value is added to the main output {@code PCollection} by {@link ProcessContext#output}.
-   * Once passed to {@code output} the element should be considered immutable and not be modified in
-   * any way. It may be cached elsewhere, retained by the Dataflow runtime, or used in other
-   * unspecified ways.
-   *
-   * @see ProcessContext
-   */
-  public abstract void processElement(ProcessContext c) throws Exception;
-
-  /**
-   * Finishes processing this batch of elements.
-   *
-   * <p>By default, does nothing.
-   */
-  public void finishBundle(Context c) throws Exception {
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * <p>By default, does not register any display data. Implementors may override this method
-   * to provide their own display data.
-   */
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Returns a {@link TypeDescriptor} capturing what is known statically
-   * about the input type of this {@code DoFn} instance's most-derived
-   * class.
-   *
-   * <p>See {@link #getOutputTypeDescriptor} for more discussion.
-   */
-  protected TypeDescriptor<InputT> getInputTypeDescriptor() {
-    return new TypeDescriptor<InputT>(getClass()) {};
-  }
-
-  /**
-   * Returns a {@link TypeDescriptor} capturing what is known statically
-   * about the output type of this {@code DoFn} instance's
-   * most-derived class.
-   *
-   * <p>In the normal case of a concrete {@code DoFn} subclass with
-   * no generic type parameters of its own (including anonymous inner
-   * classes), this will be a complete non-generic type, which is good
-   * for choosing a default output {@code Coder<OutputT>} for the output
-   * {@code PCollection<OutputT>}.
-   */
-  protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
-    return new TypeDescriptor<OutputT>(getClass()) {};
-  }
-
-  /**
-   * Returns an {@link Aggregator} with aggregation logic specified by the
-   * {@link CombineFn} argument. The name provided must be unique across
-   * {@link Aggregator}s created within the DoFn. Aggregators can only be created
-   * during pipeline construction.
-   *
-   * @param name the name of the aggregator
-   * @param combiner the {@link CombineFn} to use in the aggregator
-   * @return an aggregator for the provided name and combiner in the scope of
-   *         this DoFn
-   * @throws NullPointerException if the name or combiner is null
-   * @throws IllegalArgumentException if the given name collides with another
-   *         aggregator in this scope
-   * @throws IllegalStateException if called during pipeline processing.
-   */
-  protected final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
-      createAggregator(String name, CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
-    checkNotNull(name, "name cannot be null");
-    checkNotNull(combiner, "combiner cannot be null");
-    checkArgument(!aggregators.containsKey(name),
-        "Cannot create aggregator with name %s."
-        + " An Aggregator with that name already exists within this scope.",
-        name);
-
-    checkState(!aggregatorsAreFinal, "Cannot create an aggregator during DoFn processing."
-        + " Aggregators should be registered during pipeline construction.");
-
-    DelegatingAggregator<AggInputT, AggOutputT> aggregator =
-        new DelegatingAggregator<>(name, combiner);
-    aggregators.put(name, aggregator);
-    return aggregator;
-  }
-
-  /**
-   * Returns an {@link Aggregator} with the aggregation logic specified by the
-   * {@link SerializableFunction} argument. The name provided must be unique
-   * across {@link Aggregator}s created within the DoFn. Aggregators can only be
-   * created during pipeline construction.
-   *
-   * @param name the name of the aggregator
-   * @param combiner the {@link SerializableFunction} to use in the aggregator
-   * @return an aggregator for the provided name and combiner in the scope of
-   *         this DoFn
-   * @throws NullPointerException if the name or combiner is null
-   * @throws IllegalArgumentException if the given name collides with another
-   *         aggregator in this scope
-   * @throws IllegalStateException if called during pipeline processing.
-   */
-  protected final <AggInputT> Aggregator<AggInputT, AggInputT> createAggregator(String name,
-      SerializableFunction<Iterable<AggInputT>, AggInputT> combiner) {
-    checkNotNull(combiner, "combiner cannot be null.");
-    return createAggregator(name, Combine.IterableCombineFn.of(combiner));
-  }
-
-  /**
-   * Returns the {@link Aggregator Aggregators} created by this {@code DoFn}.
-   */
-  Collection<Aggregator<?, ?>> getAggregators() {
-    return Collections.<Aggregator<?, ?>>unmodifiableCollection(aggregators.values());
-  }
-
-  /**
-   * An {@link Aggregator} that delegates calls to addValue to another
-   * aggregator.
-   *
-   * @param <AggInputT> the type of input element
-   * @param <AggOutputT> the type of output element
-   */
-  static class DelegatingAggregator<AggInputT, AggOutputT> implements
-      Aggregator<AggInputT, AggOutputT>, Serializable {
-    private final UUID id;
-
-    private final String name;
-
-    private final CombineFn<AggInputT, ?, AggOutputT> combineFn;
-
-    private Aggregator<AggInputT, ?> delegate;
-
-    public DelegatingAggregator(String name,
-        CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
-      this.id = UUID.randomUUID();
-      this.name = checkNotNull(name, "name cannot be null");
-      // Safe contravariant cast
-      @SuppressWarnings("unchecked")
-      CombineFn<AggInputT, ?, AggOutputT> specificCombiner =
-          (CombineFn<AggInputT, ?, AggOutputT>) checkNotNull(combiner, "combineFn cannot be null");
-      this.combineFn = specificCombiner;
-    }
-
-    @Override
-    public void addValue(AggInputT value) {
-      if (delegate == null) {
-        throw new IllegalStateException(
-            "addValue cannot be called on Aggregator outside of the execution of a DoFn.");
-      } else {
-        delegate.addValue(value);
-      }
-    }
-
-    @Override
-    public String getName() {
-      return name;
-    }
-
-    @Override
-    public CombineFn<AggInputT, ?, AggOutputT> getCombineFn() {
-      return combineFn;
-    }
-
-    /**
-     * Sets the current delegate of the Aggregator.
-     *
-     * @param delegate the delegate to set in this aggregator
-     */
-    public void setDelegate(Aggregator<AggInputT, ?> delegate) {
-      this.delegate = delegate;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("name", name)
-          .add("combineFn", combineFn)
-          .toString();
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(id, name, combineFn.getClass());
-    }
-
-    /**
-     * Indicates whether some other object is "equal to" this one.
-     *
-     * <p>{@code DelegatingAggregator} instances are equal if they have the same name, their
-     * CombineFns are the same class, and they have identical IDs.
-     */
-    @Override
-    public boolean equals(Object o) {
-      if (o == this) {
-        return true;
-      }
-      if (o == null) {
-        return false;
-      }
-      if (o instanceof DelegatingAggregator) {
-        DelegatingAggregator<?, ?> that = (DelegatingAggregator<?, ?>) o;
-        return Objects.equals(this.id, that.id)
-            && Objects.equals(this.name, that.name)
-            && Objects.equals(this.combineFn.getClass(), that.combineFn.getClass());
-      }
-      return false;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
index 0616eff..d8d4181 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
@@ -194,7 +194,7 @@ public abstract class DoFnReflector {
    */
   public abstract boolean usesSingleWindow();
 
-  /** Create an {@link DoFnInvoker} bound to the given {@link DoFn}. */
+  /** Create an {@link DoFnInvoker} bound to the given {@link OldDoFn}. */
   public abstract <InputT, OutputT> DoFnInvoker<InputT, OutputT> bindInvoker(
       DoFnWithContext<InputT, OutputT> fn);
 
@@ -217,9 +217,9 @@ public abstract class DoFnReflector {
   }
 
   /**
-   * Create a {@link DoFn} that the {@link DoFnWithContext}.
+   * Create a {@link OldDoFn} that the {@link DoFnWithContext}.
    */
-  public <InputT, OutputT> DoFn<InputT, OutputT> toDoFn(DoFnWithContext<InputT, OutputT> fn) {
+  public <InputT, OutputT> OldDoFn<InputT, OutputT> toDoFn(DoFnWithContext<InputT, OutputT> fn) {
     if (usesSingleWindow()) {
       return new WindowDoFnAdapter<InputT, OutputT>(this, fn);
     } else {
@@ -287,7 +287,7 @@ public abstract class DoFnReflector {
    * <li>Any generics on the extra context arguments match what is expected. Eg.,
    *     {@code WindowingInternals<InputT, OutputT>} either matches the
    *     {@code InputT} and {@code OutputT} parameters of the
-   *     {@code DoFn<InputT, OutputT>.ProcessContext}, or it uses a wildcard, etc.
+   *     {@code OldDoFn<InputT, OutputT>.ProcessContext}, or it uses a wildcard, etc.
    * </ol>
    *
    * @param m the method to verify
@@ -328,7 +328,7 @@ public abstract class DoFnReflector {
     AdditionalParameter[] contextInfos = new AdditionalParameter[params.length - 1];
 
     // Fill in the generics in the allExtraContextArgs interface from the types in the
-    // Context or ProcessContext DoFn.
+    // Context or ProcessContext OldDoFn.
     ParameterizedType pt = (ParameterizedType) contextToken.getType();
     // We actually want the owner, since ProcessContext and Context are owned by DoFnWithContext.
     pt = (ParameterizedType) pt.getOwnerType();
@@ -364,18 +364,18 @@ public abstract class DoFnReflector {
     return ImmutableList.copyOf(contextInfos);
   }
 
-  /** Interface for invoking the {@code DoFn} processing methods. */
+  /** Interface for invoking the {@code OldDoFn} processing methods. */
   public interface DoFnInvoker<InputT, OutputT>  {
-    /** Invoke {@link DoFn#startBundle} on the bound {@code DoFn}. */
+    /** Invoke {@link OldDoFn#startBundle} on the bound {@code OldDoFn}. */
     void invokeStartBundle(
         DoFnWithContext<InputT, OutputT>.Context c,
         ExtraContextFactory<InputT, OutputT> extra);
-    /** Invoke {@link DoFn#finishBundle} on the bound {@code DoFn}. */
+    /** Invoke {@link OldDoFn#finishBundle} on the bound {@code OldDoFn}. */
     void invokeFinishBundle(
         DoFnWithContext<InputT, OutputT>.Context c,
         ExtraContextFactory<InputT, OutputT> extra);
 
-    /** Invoke {@link DoFn#processElement} on the bound {@code DoFn}. */
+    /** Invoke {@link OldDoFn#processElement} on the bound {@code OldDoFn}. */
     public void invokeProcessElement(
         DoFnWithContext<InputT, OutputT>.ProcessContext c,
         ExtraContextFactory<InputT, OutputT> extra);
@@ -565,10 +565,10 @@ public abstract class DoFnReflector {
       extends DoFnWithContext<InputT, OutputT>.Context
       implements DoFnWithContext.ExtraContextFactory<InputT, OutputT> {
 
-    private DoFn<InputT, OutputT>.Context context;
+    private OldDoFn<InputT, OutputT>.Context context;
 
     private ContextAdapter(
-        DoFnWithContext<InputT, OutputT> fn, DoFn<InputT, OutputT>.Context context) {
+        DoFnWithContext<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) {
       fn.super();
       this.context = context;
     }
@@ -618,11 +618,11 @@ public abstract class DoFnReflector {
       extends DoFnWithContext<InputT, OutputT>.ProcessContext
       implements DoFnWithContext.ExtraContextFactory<InputT, OutputT> {
 
-    private DoFn<InputT, OutputT>.ProcessContext context;
+    private OldDoFn<InputT, OutputT>.ProcessContext context;
 
     private ProcessContextAdapter(
         DoFnWithContext<InputT, OutputT> fn,
-        DoFn<InputT, OutputT>.ProcessContext context) {
+        OldDoFn<InputT, OutputT>.ProcessContext context) {
       fn.super();
       this.context = context;
     }
@@ -683,7 +683,7 @@ public abstract class DoFnReflector {
     }
   }
 
-  public static Class<?> getDoFnClass(DoFn<?, ?> fn) {
+  public static Class<?> getDoFnClass(OldDoFn<?, ?> fn) {
     if (fn instanceof SimpleDoFnAdapter) {
       return ((SimpleDoFnAdapter<?, ?>) fn).fn.getClass();
     } else {
@@ -691,7 +691,7 @@ public abstract class DoFnReflector {
     }
   }
 
-  private static class SimpleDoFnAdapter<InputT, OutputT> extends DoFn<InputT, OutputT> {
+  private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
 
     private final DoFnWithContext<InputT, OutputT> fn;
     private transient DoFnInvoker<InputT, OutputT> invoker;
@@ -703,19 +703,19 @@ public abstract class DoFnReflector {
     }
 
     @Override
-    public void startBundle(DoFn<InputT, OutputT>.Context c) throws Exception {
+    public void startBundle(OldDoFn<InputT, OutputT>.Context c) throws Exception {
       ContextAdapter<InputT, OutputT> adapter = new ContextAdapter<>(fn, c);
       invoker.invokeStartBundle(adapter, adapter);
     }
 
     @Override
-    public void finishBundle(DoFn<InputT, OutputT>.Context c) throws Exception {
+    public void finishBundle(OldDoFn<InputT, OutputT>.Context c) throws Exception {
       ContextAdapter<InputT, OutputT> adapter = new ContextAdapter<>(fn, c);
       invoker.invokeFinishBundle(adapter, adapter);
     }
 
     @Override
-    public void processElement(DoFn<InputT, OutputT>.ProcessContext c) throws Exception {
+    public void processElement(OldDoFn<InputT, OutputT>.ProcessContext c) throws Exception {
       ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c);
       invoker.invokeProcessElement(adapter, adapter);
     }
@@ -743,7 +743,7 @@ public abstract class DoFnReflector {
   }
 
   private static class WindowDoFnAdapter<InputT, OutputT>
-  extends SimpleDoFnAdapter<InputT, OutputT> implements DoFn.RequiresWindowAccess {
+  extends SimpleDoFnAdapter<InputT, OutputT> implements OldDoFn.RequiresWindowAccess {
 
     private WindowDoFnAdapter(DoFnReflector reflector, DoFnWithContext<InputT, OutputT> fn) {
       super(reflector, fn);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index a136632..9336e4c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -49,12 +49,12 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * A harness for unit-testing a {@link DoFn}.
+ * A harness for unit-testing a {@link OldDoFn}.
  *
  * <p>For example:
  *
  * <pre> {@code
- * DoFn<InputT, OutputT> fn = ...;
+ * OldDoFn<InputT, OutputT> fn = ...;
  *
  * DoFnTester<InputT, OutputT> fnTester = DoFnTester.of(fn);
  *
@@ -71,22 +71,22 @@ import java.util.Map;
  * Assert.assertThat(fnTester.processBundle(i1, i2, ...), Matchers.hasItems(...));
  * } </pre>
  *
- * @param <InputT> the type of the {@code DoFn}'s (main) input elements
- * @param <OutputT> the type of the {@code DoFn}'s (main) output elements
+ * @param <InputT> the type of the {@code OldDoFn}'s (main) input elements
+ * @param <OutputT> the type of the {@code OldDoFn}'s (main) output elements
  */
 public class DoFnTester<InputT, OutputT> {
   /**
    * Returns a {@code DoFnTester} supporting unit-testing of the given
-   * {@link DoFn}.
+   * {@link OldDoFn}.
    */
   @SuppressWarnings("unchecked")
-  public static <InputT, OutputT> DoFnTester<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
+  public static <InputT, OutputT> DoFnTester<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) {
     return new DoFnTester<InputT, OutputT>(fn);
   }
 
   /**
    * Returns a {@code DoFnTester} supporting unit-testing of the given
-   * {@link DoFn}.
+   * {@link OldDoFn}.
    */
   @SuppressWarnings("unchecked")
   public static <InputT, OutputT> DoFnTester<InputT, OutputT>
@@ -96,12 +96,12 @@ public class DoFnTester<InputT, OutputT> {
 
   /**
    * Registers the tuple of values of the side input {@link PCollectionView}s to
-   * pass to the {@link DoFn} under test.
+   * pass to the {@link OldDoFn} under test.
    *
    * <p>Resets the state of this {@link DoFnTester}.
    *
    * <p>If this isn't called, {@code DoFnTester} assumes the
-   * {@link DoFn} takes no side inputs.
+   * {@link OldDoFn} takes no side inputs.
    */
   public void setSideInputs(Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs) {
     this.sideInputs = sideInputs;
@@ -109,8 +109,8 @@ public class DoFnTester<InputT, OutputT> {
   }
 
   /**
-   * Registers the values of a side input {@link PCollectionView} to pass to the {@link DoFn} under
-   * test.
+   * Registers the values of a side input {@link PCollectionView} to pass to the {@link OldDoFn}
+   * under test.
    *
    * <p>The provided value is the final value of the side input in the specified window, not
    * the value of the input PCollection in that window.
@@ -128,7 +128,7 @@ public class DoFnTester<InputT, OutputT> {
   }
 
   /**
-   * Whether or not a {@link DoFnTester} should clone the {@link DoFn} under test.
+   * Whether or not a {@link DoFnTester} should clone the {@link OldDoFn} under test.
    */
   public enum CloningBehavior {
     CLONE,
@@ -136,14 +136,14 @@ public class DoFnTester<InputT, OutputT> {
   }
 
   /**
-   * Instruct this {@link DoFnTester} whether or not to clone the {@link DoFn} under test.
+   * Instruct this {@link DoFnTester} whether or not to clone the {@link OldDoFn} under test.
    */
   public void setCloningBehavior(CloningBehavior newValue) {
     this.cloningBehavior = newValue;
   }
 
   /**
-   *  Indicates whether this {@link DoFnTester} will clone the {@link DoFn} under test.
+   *  Indicates whether this {@link DoFnTester} will clone the {@link OldDoFn} under test.
    */
   public CloningBehavior getCloningBehavior() {
     return cloningBehavior;
@@ -165,7 +165,7 @@ public class DoFnTester<InputT, OutputT> {
   }
 
   /**
-   * A convenience method for testing {@link DoFn DoFns} with bundles of elements.
+   * A convenience method for testing {@link OldDoFn DoFns} with bundles of elements.
    * Logic proceeds as follows:
    *
    * <ol>
@@ -181,9 +181,9 @@ public class DoFnTester<InputT, OutputT> {
   }
 
   /**
-   * Calls {@link DoFn#startBundle} on the {@code DoFn} under test.
+   * Calls {@link OldDoFn#startBundle} on the {@code OldDoFn} under test.
    *
-   * <p>If needed, first creates a fresh instance of the DoFn under test.
+   * <p>If needed, first creates a fresh instance of the OldDoFn under test.
    */
   public void startBundle() throws Exception {
     resetState();
@@ -195,14 +195,14 @@ public class DoFnTester<InputT, OutputT> {
   }
 
   /**
-   * Calls {@link DoFn#processElement} on the {@code DoFn} under test, in a
-   * context where {@link DoFn.ProcessContext#element} returns the
+   * Calls {@link OldDoFn#processElement} on the {@code OldDoFn} under test, in a
+   * context where {@link OldDoFn.ProcessContext#element} returns the
    * given element.
    *
    * <p>Will call {@link #startBundle} automatically, if it hasn't
    * already been called.
    *
-   * @throws IllegalStateException if the {@code DoFn} under test has already
+   * @throws IllegalStateException if the {@code OldDoFn} under test has already
    * been finished
    */
   public void processElement(InputT element) throws Exception {
@@ -216,12 +216,12 @@ public class DoFnTester<InputT, OutputT> {
   }
 
   /**
-   * Calls {@link DoFn#finishBundle} of the {@code DoFn} under test.
+   * Calls {@link OldDoFn#finishBundle} of the {@code OldDoFn} under test.
    *
    * <p>Will call {@link #startBundle} automatically, if it hasn't
    * already been called.
    *
-   * @throws IllegalStateException if the {@code DoFn} under test has already
+   * @throws IllegalStateException if the {@code OldDoFn} under test has already
    * been finished
    */
   public void finishBundle() throws Exception {
@@ -403,18 +403,18 @@ public class DoFnTester<InputT, OutputT> {
     return MoreObjects.firstNonNull(elems, Collections.<WindowedValue<T>>emptyList());
   }
 
-  private TestContext<InputT, OutputT> createContext(DoFn<InputT, OutputT> fn) {
+  private TestContext<InputT, OutputT> createContext(OldDoFn<InputT, OutputT> fn) {
     return new TestContext<>(fn, options, mainOutputTag, outputs, accumulators);
   }
 
-  private static class TestContext<InT, OutT> extends DoFn<InT, OutT>.Context {
+  private static class TestContext<InT, OutT> extends OldDoFn<InT, OutT>.Context {
     private final PipelineOptions opts;
     private final TupleTag<OutT> mainOutputTag;
     private final Map<TupleTag<?>, List<WindowedValue<?>>> outputs;
     private final Map<String, Object> accumulators;
 
     public TestContext(
-        DoFn<InT, OutT> fn,
+        OldDoFn<InT, OutT> fn,
         PipelineOptions opts,
         TupleTag<OutT> mainOutputTag,
         Map<TupleTag<?>, List<WindowedValue<?>>> outputs,
@@ -498,7 +498,7 @@ public class DoFnTester<InputT, OutputT> {
   }
 
   private TestProcessContext<InputT, OutputT> createProcessContext(
-      DoFn<InputT, OutputT> fn,
+      OldDoFn<InputT, OutputT> fn,
       InputT elem) {
     return new TestProcessContext<>(fn,
         createContext(fn),
@@ -507,14 +507,14 @@ public class DoFnTester<InputT, OutputT> {
         sideInputs);
   }
 
-  private static class TestProcessContext<InT, OutT> extends DoFn<InT, OutT>.ProcessContext {
+  private static class TestProcessContext<InT, OutT> extends OldDoFn<InT, OutT>.ProcessContext {
     private final TestContext<InT, OutT> context;
     private final TupleTag<OutT> mainOutputTag;
     private final WindowedValue<InT> element;
     private final Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs;
 
     private TestProcessContext(
-        DoFn<InT, OutT> fn,
+        OldDoFn<InT, OutT> fn,
         TestContext<InT, OutT> context,
         WindowedValue<InT> element,
         TupleTag<OutT> mainOutputTag,
@@ -643,15 +643,15 @@ public class DoFnTester<InputT, OutputT> {
     protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
         String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
       throw new IllegalStateException("Aggregators should not be created within ProcessContext. "
-          + "Instead, create an aggregator at DoFn construction time with createAggregatorForDoFn,"
-          + " and ensure they are set up by the time startBundle is called "
-          + "with setupDelegateAggregators.");
+          + "Instead, create an aggregator at OldDoFn construction time with"
+          + " createAggregatorForDoFn, and ensure they are set up by the time startBundle is"
+          + " called with setupDelegateAggregators.");
     }
   }
 
   /////////////////////////////////////////////////////////////////////////////
 
-  /** The possible states of processing a DoFn. */
+  /** The possible states of processing a OldDoFn. */
   enum State {
     UNSTARTED,
     STARTED,
@@ -660,35 +660,35 @@ public class DoFnTester<InputT, OutputT> {
 
   private final PipelineOptions options = PipelineOptionsFactory.create();
 
-  /** The original DoFn under test. */
-  private final DoFn<InputT, OutputT> origFn;
+  /** The original OldDoFn under test. */
+  private final OldDoFn<InputT, OutputT> origFn;
 
   /**
-   * Whether to clone the original {@link DoFn} or just use it as-is.
+   * Whether to clone the original {@link OldDoFn} or just use it as-is.
    *
-   * <p></p>Worker-side {@link DoFn DoFns} may not be serializable, and are not required to be.
+   * <p></p>Worker-side {@link OldDoFn DoFns} may not be serializable, and are not required to be.
    */
   private CloningBehavior cloningBehavior = CloningBehavior.CLONE;
 
-  /** The side input values to provide to the DoFn under test. */
+  /** The side input values to provide to the OldDoFn under test. */
   private Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs =
       new HashMap<>();
 
   private Map<String, Object> accumulators;
 
-  /** The output tags used by the DoFn under test. */
+  /** The output tags used by the OldDoFn under test. */
   private TupleTag<OutputT> mainOutputTag = new TupleTag<>();
 
-  /** The original DoFn under test, if started. */
-  DoFn<InputT, OutputT> fn;
+  /** The original OldDoFn under test, if started. */
+  OldDoFn<InputT, OutputT> fn;
 
   /** The ListOutputManager to examine the outputs. */
   private Map<TupleTag<?>, List<WindowedValue<?>>> outputs;
 
-  /** The state of processing of the DoFn under test. */
+  /** The state of processing of the OldDoFn under test. */
   private State state;
 
-  private DoFnTester(DoFn<InputT, OutputT> origFn) {
+  private DoFnTester(OldDoFn<InputT, OutputT> origFn) {
     this.origFn = origFn;
     resetState();
   }
@@ -705,7 +705,7 @@ public class DoFnTester<InputT, OutputT> {
     if (cloningBehavior.equals(CloningBehavior.DO_NOT_CLONE)) {
       fn = origFn;
     } else {
-      fn = (DoFn<InputT, OutputT>)
+      fn = (OldDoFn<InputT, OutputT>)
           SerializableUtils.deserializeFromByteArray(
               SerializableUtils.serializeToByteArray(origFn),
               origFn.toString());