You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/06 02:52:55 UTC

[37/51] [abbrv] incubator-beam git commit: Port easy Java SDK tests to new DoFn

Port easy Java SDK tests to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1959ddbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1959ddbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1959ddbe

Branch: refs/heads/python-sdk
Commit: 1959ddbedb2ad61824bf28e1e9139cc677a2aaf5
Parents: f5011e5
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Aug 3 20:15:12 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 14:56:42 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/PipelineTest.java  |  8 ++---
 .../apache/beam/sdk/coders/AvroCoderTest.java   |  6 ++--
 .../beam/sdk/coders/CoderRegistryTest.java      | 10 +++---
 .../beam/sdk/coders/SerializableCoderTest.java  | 10 +++---
 .../apache/beam/sdk/io/CountingInputTest.java   |  6 ++--
 .../apache/beam/sdk/io/CountingSourceTest.java  |  6 ++--
 .../beam/sdk/io/PubsubUnboundedSinkTest.java    |  6 ++--
 .../sdk/transforms/ApproximateUniqueTest.java   |  6 ++--
 .../beam/sdk/transforms/CombineFnsTest.java     |  4 +--
 .../apache/beam/sdk/transforms/CombineTest.java | 18 +++++------
 .../apache/beam/sdk/transforms/CreateTest.java  |  4 +--
 .../apache/beam/sdk/transforms/FlattenTest.java |  8 ++---
 .../beam/sdk/transforms/GroupByKeyTest.java     |  8 ++---
 .../beam/sdk/transforms/WithTimestampsTest.java | 12 +++----
 .../display/DisplayDataEvaluatorTest.java       | 10 +++---
 .../sdk/transforms/display/DisplayDataTest.java |  6 ++--
 .../sdk/transforms/join/CoGroupByKeyTest.java   | 34 ++++++++++----------
 .../sdk/transforms/windowing/WindowTest.java    | 10 +++---
 .../sdk/transforms/windowing/WindowingTest.java | 23 ++++++-------
 .../beam/sdk/values/PCollectionTupleTest.java   |  6 ++--
 .../apache/beam/sdk/values/TypedPValueTest.java | 10 +++---
 21 files changed, 106 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index 5137031..8b86499 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -36,8 +36,8 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.UserCodeException;
@@ -146,9 +146,9 @@ public class PipelineTest {
 
   private static PTransform<PCollection<? extends String>, PCollection<String>> addSuffix(
       final String suffix) {
-    return ParDo.of(new OldDoFn<String, String>() {
-      @Override
-      public void processElement(OldDoFn<String, String>.ProcessContext c) {
+    return ParDo.of(new DoFn<String, String>() {
+      @ProcessElement
+      public void processElement(DoFn<String, String>.ProcessContext c) {
         c.output(c.element() + suffix);
       }
     });

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
index 41d0932..3b13e35 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
@@ -31,7 +31,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.SerializableUtils;
@@ -134,8 +134,8 @@ public class AvroCoderTest {
     }
   }
 
-  private static class GetTextFn extends OldDoFn<Pojo, String> {
-    @Override
+  private static class GetTextFn extends DoFn<Pojo, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(c.element().text);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index 35ec6c6..da15405 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.CloudObject;
@@ -366,8 +366,8 @@ public class CoderRegistryTest {
   private static class PTransformOutputingMySerializableGeneric
   extends PTransform<PCollection<String>, PCollection<KV<String, MySerializableGeneric<String>>>> {
 
-    private class OutputDoFn extends OldDoFn<String, KV<String, MySerializableGeneric<String>>> {
-      @Override
+    private class OutputDoFn extends DoFn<String, KV<String, MySerializableGeneric<String>>> {
+      @ProcessElement
       public void processElement(ProcessContext c) { }
     }
 
@@ -430,8 +430,8 @@ public class CoderRegistryTest {
       PCollection<String>,
       PCollection<KV<String, MySerializableGeneric<T>>>> {
 
-    private class OutputDoFn extends OldDoFn<String, KV<String, MySerializableGeneric<T>>> {
-      @Override
+    private class OutputDoFn extends DoFn<String, KV<String, MySerializableGeneric<T>>> {
+      @ProcessElement
       public void processElement(ProcessContext c) { }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
index 3e7fd50..b5465fa 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -82,15 +82,15 @@ public class SerializableCoderTest implements Serializable {
     }
   }
 
-  static class StringToRecord extends OldDoFn<String, MyRecord> {
-    @Override
+  static class StringToRecord extends DoFn<String, MyRecord> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(new MyRecord(c.element()));
     }
   }
 
-  static class RecordToString extends OldDoFn<MyRecord, String> {
-    @Override
+  static class RecordToString extends DoFn<MyRecord, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(c.element().value);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
index 95f7454..4ec2c9a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
@@ -29,9 +29,9 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.RemoveDuplicates;
@@ -120,8 +120,8 @@ public class CountingInputTest {
     assertThat(endTime.isAfter(startTime.plus(expectedRuntimeMillis)), is(true));
   }
 
-  private static class ElementValueDiff extends OldDoFn<Long, Long> {
-    @Override
+  private static class ElementValueDiff extends DoFn<Long, Long> {
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       c.output(c.element() - c.timestamp().getMillis());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
index 45f636f..0bd91c1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
@@ -34,10 +34,10 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.RemoveDuplicates;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -159,8 +159,8 @@ public class CountingSourceTest {
     p.run();
   }
 
-  private static class ElementValueDiff extends OldDoFn<Long, Long> {
-    @Override
+  private static class ElementValueDiff extends DoFn<Long, Long> {
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       c.output(c.element() - c.timestamp().getMillis());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
index f8592c9..db03a5c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.PubsubClient;
 import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
@@ -58,8 +58,8 @@ public class PubsubUnboundedSinkTest {
   private static final String ID_LABEL = "id";
   private static final int NUM_SHARDS = 10;
 
-  private static class Stamp extends OldDoFn<String, String> {
-    @Override
+  private static class Stamp extends DoFn<String, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       c.outputWithTimestamp(c.element(), new Instant(TIMESTAMP));
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
index 5c8732f..7b6d671 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
@@ -54,7 +54,7 @@ import java.util.List;
  */
 @RunWith(JUnit4.class)
 public class ApproximateUniqueTest implements Serializable {
-  // implements Serializable just to make it easy to use anonymous inner OldDoFn subclasses
+  // implements Serializable just to make it easy to use anonymous inner DoFn subclasses
 
   @Test
   public void testEstimationErrorToSampleSize() {
@@ -223,8 +223,8 @@ public class ApproximateUniqueTest implements Serializable {
             .apply(View.<Long>asSingleton());
 
     PCollection<KV<Long, Long>> approximateAndExact = approximate
-        .apply(ParDo.of(new OldDoFn<Long, KV<Long, Long>>() {
-              @Override
+        .apply(ParDo.of(new DoFn<Long, KV<Long, Long>>() {
+              @ProcessElement
               public void processElement(ProcessContext c) {
                 c.output(KV.of(c.element(), c.sideInput(exact)));
               }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index d6bf826..95ba1aa 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -461,7 +461,7 @@ public class  CombineFnsTest {
   }
 
   private static class ExtractResultDoFn
-      extends OldDoFn<KV<String, CoCombineResult>, KV<String, KV<Integer, String>>> {
+      extends DoFn<KV<String, CoCombineResult>, KV<String, KV<Integer, String>>> {
 
     private final TupleTag<Integer> maxIntTag;
     private final TupleTag<UserString> concatStringTag;
@@ -471,7 +471,7 @@ public class  CombineFnsTest {
       this.concatStringTag = concatStringTag;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       UserString userString = c.element().getValue().get(concatStringTag);
       KV<Integer, String> value = KV.of(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index cb9928e..6421b3b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -117,7 +117,7 @@ public class CombineTest implements Serializable {
     1, 1, 2, 3, 5, 8, 13, 21, 34, 55
   };
 
-  @Mock private OldDoFn<?, ?>.ProcessContext processContext;
+  @Mock private DoFn<?, ?>.ProcessContext processContext;
 
   PCollection<KV<String, Integer>> createInput(Pipeline p,
                                                KV<String, Integer>[] table) {
@@ -372,8 +372,8 @@ public class CombineTest implements Serializable {
     pipeline.run();
   }
 
-  private static class FormatPaneInfo extends OldDoFn<Integer, String> {
-    @Override
+  private static class FormatPaneInfo extends DoFn<Integer, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(c.element() + ": " + c.pane().isLast());
     }
@@ -560,8 +560,8 @@ public class CombineTest implements Serializable {
     pipeline.run();
   }
 
-  private static class GetLast extends OldDoFn<Integer, Integer> {
-    @Override
+  private static class GetLast extends DoFn<Integer, Integer> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       if (c.pane().isLast()) {
         c.output(c.element());
@@ -653,8 +653,8 @@ public class CombineTest implements Serializable {
 
     PCollection<Integer> output = pipeline
         .apply("CreateVoidMainInput", Create.of((Void) null))
-        .apply("OutputSideInput", ParDo.of(new OldDoFn<Void, Integer>() {
-                  @Override
+        .apply("OutputSideInput", ParDo.of(new DoFn<Void, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(c.sideInput(view));
                   }
@@ -1176,8 +1176,8 @@ public class CombineTest implements Serializable {
   }
 
   private static <T> PCollection<T> copy(PCollection<T> pc, final int n) {
-    return pc.apply(ParDo.of(new OldDoFn<T, T>() {
-      @Override
+    return pc.apply(ParDo.of(new DoFn<T, T>() {
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         for (int i = 0; i < n; i++) {
           c.output(c.element());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index cf65423..9db0136 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -229,8 +229,8 @@ public class CreateTest {
     p.run();
   }
 
-  private static class PrintTimestamps extends OldDoFn<String, String> {
-    @Override
+  private static class PrintTimestamps extends DoFn<String, String> {
+    @ProcessElement
       public void processElement(ProcessContext c) {
       c.output(c.element() + ":" + c.timestamp().getMillis());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index b81eedb..604536b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -130,8 +130,8 @@ public class FlattenTest implements Serializable {
 
     PCollection<String> output = p
         .apply(Create.of((Void) null).withCoder(VoidCoder.of()))
-        .apply(ParDo.withSideInputs(view).of(new OldDoFn<Void, String>() {
-                  @Override
+        .apply(ParDo.withSideInputs(view).of(new DoFn<Void, String>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (String side : c.sideInput(view)) {
                       c.output(side);
@@ -339,8 +339,8 @@ public class FlattenTest implements Serializable {
 
   /////////////////////////////////////////////////////////////////////////////
 
-  private static class IdentityFn<T> extends OldDoFn<T, T> {
-    @Override
+  private static class IdentityFn<T> extends DoFn<T, T> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(c.element());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 15c3ba8..afe460f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -371,14 +371,14 @@ public class GroupByKeyTest {
     pipeline.run();
   }
 
-  private static class AssertTimestamp<K, V> extends OldDoFn<KV<K, V>, Void> {
+  private static class AssertTimestamp<K, V> extends DoFn<KV<K, V>, Void> {
     private final Instant timestamp;
 
     public AssertTimestamp(Instant timestamp) {
       this.timestamp = timestamp;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       assertThat(c.timestamp(), equalTo(timestamp));
     }
@@ -506,9 +506,9 @@ public class GroupByKeyTest {
    * Creates a KV that wraps the original KV together with a random key.
    */
   static class AssignRandomKey
-      extends OldDoFn<KV<BadEqualityKey, Long>, KV<Long, KV<BadEqualityKey, Long>>> {
+      extends DoFn<KV<BadEqualityKey, Long>, KV<Long, KV<BadEqualityKey, Long>>> {
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       c.output(KV.of(ThreadLocalRandom.current().nextLong(), c.element()));
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
index d2ba452..e381470 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
@@ -65,9 +65,9 @@ public class WithTimestampsTest implements Serializable {
          .apply(WithTimestamps.of(timestampFn));
 
     PCollection<KV<String, Instant>> timestampedVals =
-        timestamped.apply(ParDo.of(new OldDoFn<String, KV<String, Instant>>() {
-          @Override
-          public void processElement(OldDoFn<String, KV<String, Instant>>.ProcessContext c)
+        timestamped.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() {
+          @ProcessElement
+          public void processElement(DoFn<String, KV<String, Instant>>.ProcessContext c)
               throws Exception {
             c.output(KV.of(c.element(), c.timestamp()));
           }
@@ -150,9 +150,9 @@ public class WithTimestampsTest implements Serializable {
              WithTimestamps.of(backInTimeFn).withAllowedTimestampSkew(skew.plus(100L)));
 
     PCollection<KV<String, Instant>> timestampedVals =
-        timestampedWithSkew.apply(ParDo.of(new OldDoFn<String, KV<String, Instant>>() {
-          @Override
-          public void processElement(OldDoFn<String, KV<String, Instant>>.ProcessContext c)
+        timestampedWithSkew.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() {
+          @ProcessElement
+          public void processElement(DoFn<String, KV<String, Instant>>.ProcessContext c)
               throws Exception {
             c.output(KV.of(c.element(), c.timestamp()));
           }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
index c1848c6..e233114 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
@@ -24,7 +24,7 @@ import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PBegin;
@@ -50,8 +50,8 @@ public class DisplayDataEvaluatorTest implements Serializable {
         new PTransform<PCollection<String>, POutput> () {
           @Override
           public PCollection<String> apply(PCollection<String> input) {
-            return input.apply(ParDo.of(new OldDoFn<String, String>() {
-              @Override
+            return input.apply(ParDo.of(new DoFn<String, String>() {
+              @ProcessElement
               public void processElement(ProcessContext c) throws Exception {
                 c.output(c.element());
               }
@@ -79,8 +79,8 @@ public class DisplayDataEvaluatorTest implements Serializable {
   @Test
   public void testPrimitiveTransform() {
     PTransform<? super PCollection<Integer>, ? super PCollection<Integer>> myTransform = ParDo.of(
-        new OldDoFn<Integer, Integer>() {
-      @Override
+        new DoFn<Integer, Integer>() {
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {}
 
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
index 517f968..e2f38b4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
@@ -41,7 +41,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
@@ -1053,8 +1053,8 @@ public class DisplayDataTest implements Serializable {
   private static class IdentityTransform<T> extends PTransform<PCollection<T>, PCollection<T>> {
     @Override
     public PCollection<T> apply(PCollection<T> input) {
-      return input.apply(ParDo.of(new OldDoFn<T, T>() {
-        @Override
+      return input.apply(ParDo.of(new DoFn<T, T>() {
+        @ProcessElement
         public void processElement(ProcessContext c) throws Exception {
           c.output(c.element());
         }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
index 97667a3..c6f82ec 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
@@ -29,9 +29,8 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -84,10 +83,11 @@ public class CoGroupByKeyTest implements Serializable {
       input = p.apply("Create" + name, Create.timestamped(list, timestamps)
           .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of())));
     }
-    return input
-            .apply("Identity" + name, ParDo.of(new OldDoFn<KV<Integer, String>,
-                                                 KV<Integer, String>>() {
-              @Override
+    return input.apply(
+        "Identity" + name,
+        ParDo.of(
+            new DoFn<KV<Integer, String>, KV<Integer, String>>() {
+              @ProcessElement
               public void processElement(ProcessContext c) {
                 c.output(c.element());
               }
@@ -313,11 +313,11 @@ public class CoGroupByKeyTest implements Serializable {
   }
 
   /**
-   * A OldDoFn used in testCoGroupByKeyWithWindowing(), to test processing the
-   * results of a CoGroupByKey.
+   * A DoFn used in testCoGroupByKeyWithWindowing(), to test processing the results of a
+   * CoGroupByKey.
    */
-  private static class ClickOfPurchaseFn extends
-      OldDoFn<KV<Integer, CoGbkResult>, KV<String, String>> implements RequiresWindowAccess {
+  private static class ClickOfPurchaseFn
+      extends DoFn<KV<Integer, CoGbkResult>, KV<String, String>> {
     private final TupleTag<String> clicksTag;
 
     private final TupleTag<String> purchasesTag;
@@ -329,9 +329,9 @@ public class CoGroupByKeyTest implements Serializable {
       this.purchasesTag = purchasesTag;
     }
 
-    @Override
-    public void processElement(ProcessContext c) {
-      BoundedWindow w = c.window();
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) {
+      BoundedWindow w = window;
       KV<Integer, CoGbkResult> e = c.element();
       CoGbkResult row = e.getValue();
       Iterable<String> clicks = row.getAll(clicksTag);
@@ -347,11 +347,11 @@ public class CoGroupByKeyTest implements Serializable {
 
 
   /**
-   * A OldDoFn used in testCoGroupByKeyHandleResults(), to test processing the
+   * A DoFn used in testCoGroupByKeyHandleResults(), to test processing the
    * results of a CoGroupByKey.
    */
   private static class CorrelatePurchaseCountForAddressesWithoutNamesFn extends
-      OldDoFn<KV<Integer, CoGbkResult>, KV<String, Integer>> {
+      DoFn<KV<Integer, CoGbkResult>, KV<String, Integer>> {
     private final TupleTag<String> purchasesTag;
 
     private final TupleTag<String> addressesTag;
@@ -367,7 +367,7 @@ public class CoGroupByKeyTest implements Serializable {
       this.namesTag = namesTag;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       KV<Integer, CoGbkResult> e = c.element();
       CoGbkResult row = e.getValue();
@@ -401,7 +401,7 @@ public class CoGroupByKeyTest implements Serializable {
   }
 
   /**
-   * Tests that the consuming OldDoFn
+   * Tests that the consuming DoFn
    * (CorrelatePurchaseCountForAddressesWithoutNamesFn) performs as expected.
    */
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 27d2539..c583860 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -36,8 +36,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -199,8 +199,8 @@ public class WindowTest implements Serializable {
         .apply(GroupByKey.<Integer, String>create())
         .apply(
             ParDo.of(
-                new OldDoFn<KV<Integer, Iterable<String>>, Void>() {
-                  @Override
+                new DoFn<KV<Integer, Iterable<String>>, Void>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) throws Exception {
                     assertThat(
                         c.timestamp(),
@@ -231,8 +231,8 @@ public class WindowTest implements Serializable {
         .apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10)))
             .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
         .apply(GroupByKey.<Integer, String>create())
-        .apply(ParDo.of(new OldDoFn<KV<Integer, Iterable<String>>, Void>() {
-          @Override
+        .apply(ParDo.of(new DoFn<KV<Integer, Iterable<String>>, Void>() {
+          @ProcessElement
           public void processElement(ProcessContext c) throws Exception {
             assertThat(c.timestamp(), equalTo(new Instant(10 * 60 * 1000 - 1)));
           }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index 622a277..159e700 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -26,9 +26,8 @@ import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
@@ -58,12 +57,14 @@ public class WindowingTest implements Serializable {
 
   private static class WindowedCount extends PTransform<PCollection<String>, PCollection<String>> {
 
-    private final class FormatCountsDoFn
-        extends OldDoFn<KV<String, Long>, String> implements RequiresWindowAccess {
-      @Override
-          public void processElement(ProcessContext c) {
-        c.output(c.element().getKey() + ":" + c.element().getValue()
-            + ":" + c.timestamp().getMillis() + ":" + c.window());
+    private final class FormatCountsDoFn extends DoFn<KV<String, Long>, String> {
+      @ProcessElement
+      public void processElement(ProcessContext c, BoundedWindow window) {
+        c.output(
+            c.element().getKey()
+                + ":" + c.element().getValue()
+                + ":" + c.timestamp().getMillis()
+                + ":" + window);
       }
     }
     private WindowFn<? super String, ?> windowFn;
@@ -234,9 +235,9 @@ public class WindowingTest implements Serializable {
     p.run();
   }
 
-  /** A OldDoFn that tokenizes lines of text into individual words. */
-  static class ExtractWordsWithTimestampsFn extends OldDoFn<String, String> {
-    @Override
+  /** A DoFn that tokenizes lines of text into individual words. */
+  static class ExtractWordsWithTimestampsFn extends DoFn<String, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       String[] words = c.element().split("[^a-zA-Z0-9']+");
       if (words.length == 2) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
index 547c778..13218b2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -75,8 +75,8 @@ public final class PCollectionTupleTest implements Serializable {
         .apply(Create.of(inputs));
 
     PCollectionTuple outputs = mainInput.apply(ParDo
-        .of(new OldDoFn<Integer, Integer>() {
-          @Override
+        .of(new DoFn<Integer, Integer>() {
+          @ProcessElement
           public void processElement(ProcessContext c) {
             c.sideOutput(sideOutputTag, c.element());
           }})

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
index c525cf1..287223f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 
 import org.junit.Rule;
@@ -44,9 +44,9 @@ public class TypedPValueTest {
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
-  private static class IdentityDoFn extends OldDoFn<Integer, Integer> {
+  private static class IdentityDoFn extends DoFn<Integer, Integer> {
     private static final long serialVersionUID = 0;
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       c.output(c.element());
     }
@@ -129,9 +129,9 @@ public class TypedPValueTest {
   static class EmptyClass {
   }
 
-  private static class EmptyClassDoFn extends OldDoFn<Integer, EmptyClass> {
+  private static class EmptyClassDoFn extends DoFn<Integer, EmptyClass> {
     private static final long serialVersionUID = 0;
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       c.output(new EmptyClass());
     }