You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/05 03:10:43 UTC

[1/9] incubator-beam git commit: Port easy transforms to new DoFn

Repository: incubator-beam
Updated Branches:
  refs/heads/master fcf6b1d34 -> 8daf518bc


Port easy transforms to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/47341e11
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/47341e11
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/47341e11

Branch: refs/heads/master
Commit: 47341e113334827101ddbf775c69ae34d178cd8f
Parents: 269fbf3
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Aug 3 20:27:28 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 14:56:42 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/transforms/Count.java  |  4 ++--
 .../java/org/apache/beam/sdk/transforms/Create.java |  4 ++--
 .../apache/beam/sdk/transforms/FlatMapElements.java |  4 ++--
 .../org/apache/beam/sdk/transforms/Flatten.java     |  4 ++--
 .../java/org/apache/beam/sdk/transforms/Keys.java   |  4 ++--
 .../java/org/apache/beam/sdk/transforms/KvSwap.java |  4 ++--
 .../org/apache/beam/sdk/transforms/MapElements.java |  4 ++--
 .../org/apache/beam/sdk/transforms/Partition.java   |  4 ++--
 .../beam/sdk/transforms/RemoveDuplicates.java       |  4 ++--
 .../java/org/apache/beam/sdk/transforms/Sample.java |  6 +++---
 .../java/org/apache/beam/sdk/transforms/Values.java |  4 ++--
 .../java/org/apache/beam/sdk/transforms/View.java   |  8 ++++----
 .../org/apache/beam/sdk/transforms/WithKeys.java    |  4 ++--
 .../apache/beam/sdk/transforms/WithTimestamps.java  |  6 +++---
 .../beam/sdk/transforms/join/CoGroupByKey.java      | 16 ++++++++--------
 15 files changed, 40 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index 7601ffc..ac59c76 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -107,8 +107,8 @@ public class Count {
     public PCollection<KV<T, Long>> apply(PCollection<T> input) {
       return
           input
-          .apply("Init", ParDo.of(new OldDoFn<T, KV<T, Void>>() {
-            @Override
+          .apply("Init", ParDo.of(new DoFn<T, KV<T, Void>>() {
+            @ProcessElement
             public void processElement(ProcessContext c) {
               c.output(KV.of(c.element(), (Void) null));
             }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index fb7f784..08d0a7a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -486,8 +486,8 @@ public class Create<T> {
       this.elementCoder = elementCoder;
     }
 
-    private static class ConvertTimestamps<T> extends OldDoFn<TimestampedValue<T>, T> {
-      @Override
+    private static class ConvertTimestamps<T> extends DoFn<TimestampedValue<T>, T> {
+      @ProcessElement
       public void processElement(ProcessContext c) {
         c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp());
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index b48da38..694592e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -133,9 +133,9 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
 
   @Override
   public PCollection<OutputT> apply(PCollection<InputT> input) {
-    return input.apply("Map", ParDo.of(new OldDoFn<InputT, OutputT>() {
+    return input.apply("Map", ParDo.of(new DoFn<InputT, OutputT>() {
       private static final long serialVersionUID = 0L;
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) {
         for (OutputT element : fn.apply(c.element())) {
           c.output(element);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
index 53e898e..7e09d7e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
@@ -174,8 +174,8 @@ public class Flatten {
       Coder<T> elemCoder = ((IterableLikeCoder<T, ?>) inCoder).getElemCoder();
 
       return in.apply("FlattenIterables", ParDo.of(
-          new OldDoFn<Iterable<T>, T>() {
-            @Override
+          new DoFn<Iterable<T>, T>() {
+            @ProcessElement
             public void processElement(ProcessContext c) {
               for (T i : c.element()) {
                 c.output(i);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
index c8cbce8..5ac1866 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
@@ -58,8 +58,8 @@ public class Keys<K> extends PTransform<PCollection<? extends KV<K, ?>>,
   @Override
   public PCollection<K> apply(PCollection<? extends KV<K, ?>> in) {
     return
-        in.apply("Keys", ParDo.of(new OldDoFn<KV<K, ?>, K>() {
-          @Override
+        in.apply("Keys", ParDo.of(new DoFn<KV<K, ?>, K>() {
+          @ProcessElement
           public void processElement(ProcessContext c) {
             c.output(c.element().getKey());
           }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
index 430d37b..d4386d2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
@@ -62,8 +62,8 @@ public class KvSwap<K, V> extends PTransform<PCollection<KV<K, V>>,
   @Override
   public PCollection<KV<V, K>> apply(PCollection<KV<K, V>> in) {
     return
-        in.apply("KvSwap", ParDo.of(new OldDoFn<KV<K, V>, KV<V, K>>() {
-          @Override
+        in.apply("KvSwap", ParDo.of(new DoFn<KV<K, V>, KV<V, K>>() {
+          @ProcessElement
           public void processElement(ProcessContext c) {
             KV<K, V> e = c.element();
             c.output(KV.of(e.getValue(), e.getKey()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index c83c39f..b7b9a5f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -104,8 +104,8 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
 
   @Override
   public PCollection<OutputT> apply(PCollection<InputT> input) {
-    return input.apply("Map", ParDo.of(new OldDoFn<InputT, OutputT>() {
-      @Override
+    return input.apply("Map", ParDo.of(new DoFn<InputT, OutputT>() {
+      @ProcessElement
       public void processElement(ProcessContext c) {
         c.output(fn.apply(c.element()));
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
index 2ddcc29..05c9470 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
@@ -134,7 +134,7 @@ public class Partition<T> extends PTransform<PCollection<T>, PCollectionList<T>>
     this.partitionDoFn = partitionDoFn;
   }
 
-  private static class PartitionDoFn<X> extends OldDoFn<X, Void> {
+  private static class PartitionDoFn<X> extends DoFn<X, Void> {
     private final int numPartitions;
     private final PartitionFn<? super X> partitionFn;
     private final TupleTagList outputTags;
@@ -163,7 +163,7 @@ public class Partition<T> extends PTransform<PCollection<T>, PCollectionList<T>>
       return outputTags;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       X input = c.element();
       int partition = partitionFn.partitionFor(input, numPartitions);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
index d82c457..bba4b51 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
@@ -85,8 +85,8 @@ public class RemoveDuplicates<T> extends PTransform<PCollection<T>,
   @Override
   public PCollection<T> apply(PCollection<T> in) {
     return in
-        .apply("CreateIndex", ParDo.of(new OldDoFn<T, KV<T, Void>>() {
-          @Override
+        .apply("CreateIndex", ParDo.of(new DoFn<T, KV<T, Void>>() {
+          @ProcessElement
           public void processElement(ProcessContext c) {
             c.output(KV.of(c.element(), (Void) null));
           }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
index 724b252..12ff2b9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
@@ -164,9 +164,9 @@ public class Sample {
   }
 
   /**
-   * A {@link OldDoFn} that returns up to limit elements from the side input PCollection.
+   * A {@link DoFn} that returns up to limit elements from the side input PCollection.
    */
-  private static class SampleAnyDoFn<T> extends OldDoFn<Void, T> {
+  private static class SampleAnyDoFn<T> extends DoFn<Void, T> {
     long limit;
     final PCollectionView<Iterable<T>> iterableView;
 
@@ -175,7 +175,7 @@ public class Sample {
       this.iterableView = iterableView;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       for (T i : c.sideInput(iterableView)) {
         if (limit-- <= 0) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
index 856e32a..34342db 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
@@ -58,8 +58,8 @@ public class Values<V> extends PTransform<PCollection<? extends KV<?, V>>,
   @Override
   public PCollection<V> apply(PCollection<? extends KV<?, V>> in) {
     return
-        in.apply("Values", ParDo.of(new OldDoFn<KV<?, V>, V>() {
-          @Override
+        in.apply("Values", ParDo.of(new DoFn<KV<?, V>, V>() {
+          @ProcessElement
           public void processElement(ProcessContext c) {
             c.output(c.element().getValue());
           }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 8a61637..7a97c13 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -38,7 +38,7 @@ import java.util.Map;
  *
  * <p>When a {@link ParDo} tranform is processing a main input
  * element in a window {@code w} and a {@link PCollectionView} is read via
- * {@link OldDoFn.ProcessContext#sideInput}, the value of the view for {@code w} is
+ * {@link DoFn.ProcessContext#sideInput}, the value of the view for {@code w} is
  * returned.
  *
  * <p>The SDK supports viewing a {@link PCollection}, per window, as a single value,
@@ -118,7 +118,7 @@ import java.util.Map;
  *
  * PCollection PageVisits = urlVisits
  *     .apply(ParDo.withSideInputs(urlToPage)
- *         .of(new OldDoFn<UrlVisit, PageVisit>() {
+ *         .of(new DoFn<UrlVisit, PageVisit>() {
  *             {@literal @}Override
  *             void processElement(ProcessContext context) {
  *               UrlVisit urlVisit = context.element();
@@ -154,11 +154,11 @@ public class View {
    *
    * <p>If the input {@link PCollection} is empty,
    * throws {@link java.util.NoSuchElementException} in the consuming
-   * {@link OldDoFn}.
+   * {@link DoFn}.
    *
    * <p>If the input {@link PCollection} contains more than one
    * element, throws {@link IllegalArgumentException} in the
-   * consuming {@link OldDoFn}.
+   * consuming {@link DoFn}.
    */
   public static <T> AsSingleton<T> asSingleton() {
     return new AsSingleton<>();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
index 37d45aa..2a44963 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
@@ -113,8 +113,8 @@ public class WithKeys<K, V> extends PTransform<PCollection<V>,
   @Override
   public PCollection<KV<K, V>> apply(PCollection<V> in) {
     PCollection<KV<K, V>> result =
-        in.apply("AddKeys", ParDo.of(new OldDoFn<V, KV<K, V>>() {
-          @Override
+        in.apply("AddKeys", ParDo.of(new DoFn<V, KV<K, V>>() {
+          @ProcessElement
           public void processElement(ProcessContext c) {
             c.output(KV.of(fn.apply(c.element()),
                 c.element()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
index 41b549b..7b395f5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
@@ -92,7 +92,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
    * Returns the allowed timestamp skew duration, which is the maximum
    * duration that timestamps can be shifted backwards from the timestamp of the input element.
    *
-   * @see OldDoFn#getAllowedTimestampSkew()
+   * @see DoFn#getAllowedTimestampSkew()
    */
   public Duration getAllowedTimestampSkew() {
     return allowedTimestampSkew;
@@ -105,7 +105,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
         .setTypeDescriptorInternal(input.getTypeDescriptor());
   }
 
-  private static class AddTimestampsDoFn<T> extends OldDoFn<T, T> {
+  private static class AddTimestampsDoFn<T> extends DoFn<T, T> {
     private final SerializableFunction<T, Instant> fn;
     private final Duration allowedTimestampSkew;
 
@@ -114,7 +114,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
       this.allowedTimestampSkew = allowedTimestampSkew;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       Instant timestamp = fn.apply(c.element());
       checkNotNull(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
index 1bd9f4a..cb06f95 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
@@ -19,9 +19,9 @@ package org.apache.beam.sdk.transforms.join;
 
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder;
@@ -57,7 +57,7 @@ import java.util.List;
  *
  * PCollection<T> finalResultCollection =
  *   coGbkResultCollection.apply(ParDo.of(
- *     new OldDoFn<KV<K, CoGbkResult>, T>() {
+ *     new DoFn<KV<K, CoGbkResult>, T>() {
  *       @Override
  *       public void processElement(ProcessContext c) {
  *         KV<K, CoGbkResult> e = c.element();
@@ -167,12 +167,12 @@ public class CoGroupByKey<K> extends
   }
 
   /**
-   * A OldDoFn to construct a UnionTable (i.e., a
+   * A DoFn to construct a UnionTable (i.e., a
    * {@code PCollection<KV<K, RawUnionValue>>} from a
    * {@code PCollection<KV<K, V>>}.
    */
   private static class ConstructUnionTableFn<K, V> extends
-      OldDoFn<KV<K, V>, KV<K, RawUnionValue>> {
+      DoFn<KV<K, V>, KV<K, RawUnionValue>> {
 
     private final int index;
 
@@ -180,7 +180,7 @@ public class CoGroupByKey<K> extends
       this.index = index;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       KV<K, ?> e = c.element();
       c.output(KV.of(e.getKey(), new RawUnionValue(index, e.getValue())));
@@ -188,11 +188,11 @@ public class CoGroupByKey<K> extends
   }
 
   /**
-   * A OldDoFn to construct a CoGbkResult from an input grouped union
+   * A DoFn to construct a CoGbkResult from an input grouped union
    * table.
     */
   private static class ConstructCoGbkResultFn<K>
-    extends OldDoFn<KV<K, Iterable<RawUnionValue>>,
+    extends DoFn<KV<K, Iterable<RawUnionValue>>,
                      KV<K, CoGbkResult>> {
 
     private final CoGbkResultSchema schema;
@@ -201,7 +201,7 @@ public class CoGroupByKey<K> extends
       this.schema = schema;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       KV<K, Iterable<RawUnionValue>> e = c.element();
       c.output(KV.of(e.getKey(), new CoGbkResult(schema, e.getValue())));


[4/9] incubator-beam git commit: Correctly determine if DoFn has an anonymous class in ParDo

Posted by ke...@apache.org.
Correctly determine if DoFn has an anonymous class in ParDo


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0b186529
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0b186529
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0b186529

Branch: refs/heads/master
Commit: 0b1865295cb89d88878d0a021df103ed45240924
Parents: fcf6b1d
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Aug 4 14:54:56 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 14:56:42 2016 -0700

----------------------------------------------------------------------
 .../core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0b186529/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index bb1af9c..91f6203 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -958,7 +958,7 @@ public class ParDo {
     @Override
     protected String getKindString() {
       Class<?> clazz = DoFnReflector.getDoFnClass(fn);
-      if (fn.getClass().isAnonymousClass()) {
+      if (clazz.isAnonymousClass()) {
         return "AnonymousParMultiDo";
       } else {
         return String.format("ParMultiDo(%s)", StringUtils.approximateSimpleName(clazz));


[6/9] incubator-beam git commit: Port PAssert to new DoFn

Posted by ke...@apache.org.
Port PAssert to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ef5e31f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ef5e31f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ef5e31f8

Branch: refs/heads/master
Commit: ef5e31f8b79dcedf8feb4bba0e313bfcf330ab1e
Parents: 1959ddb
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Aug 3 20:15:58 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 14:56:42 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/testing/PAssert.java    | 39 ++++++++++----------
 1 file changed, 19 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef5e31f8/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index 80340c2..e07ee3d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -33,11 +33,10 @@ import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -762,8 +761,8 @@ public class PAssert {
           .apply("RewindowActuals", rewindowActuals.<T>windowActuals())
           .apply(
               ParDo.of(
-                  new OldDoFn<T, T>() {
-                    @Override
+                  new DoFn<T, T>() {
+                    @ProcessElement
                     public void processElement(ProcessContext context) throws CoderException {
                       context.output(CoderUtils.clone(coder, context.element()));
                     }
@@ -884,8 +883,8 @@ public class PAssert {
     }
   }
 
-  private static final class ConcatFn<T> extends OldDoFn<Iterable<Iterable<T>>, Iterable<T>> {
-    @Override
+  private static final class ConcatFn<T> extends DoFn<Iterable<Iterable<T>>, Iterable<T>> {
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       c.output(Iterables.concat(c.element()));
     }
@@ -995,13 +994,13 @@ public class PAssert {
   }
 
   /**
-   * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of a
+   * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of a
    * {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing.
    *
    * <p>The input is ignored, but is {@link Integer} to be usable on runners that do not support
    * null values.
    */
-  private static class SideInputCheckerDoFn<ActualT> extends OldDoFn<Integer, Void> {
+  private static class SideInputCheckerDoFn<ActualT> extends DoFn<Integer, Void> {
     private final SerializableFunction<ActualT, Void> checkerFn;
     private final Aggregator<Integer, Integer> success =
         createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
@@ -1015,7 +1014,7 @@ public class PAssert {
       this.actual = actual;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       try {
         ActualT actualContents = c.sideInput(actual);
@@ -1030,13 +1029,13 @@ public class PAssert {
   }
 
   /**
-   * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of
+   * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
    * the single iterable element of the input {@link PCollection} and adjusts counters and
    * thrown exceptions for use in testing.
    *
    * <p>The singleton property is presumed, not enforced.
    */
-  private static class GroupedValuesCheckerDoFn<ActualT> extends OldDoFn<ActualT, Void> {
+  private static class GroupedValuesCheckerDoFn<ActualT> extends DoFn<ActualT, Void> {
     private final SerializableFunction<ActualT, Void> checkerFn;
     private final Aggregator<Integer, Integer> success =
         createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
@@ -1047,7 +1046,7 @@ public class PAssert {
       this.checkerFn = checkerFn;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       try {
         doChecks(c.element(), checkerFn, success, failure);
@@ -1061,14 +1060,14 @@ public class PAssert {
   }
 
   /**
-   * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of
+   * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
    * the single item contained within the single iterable on input and
    * adjusts counters and thrown exceptions for use in testing.
    *
    * <p>The singleton property of the input {@link PCollection} is presumed, not enforced. However,
    * each input element must be a singleton iterable, or this will fail.
    */
-  private static class SingletonCheckerDoFn<ActualT> extends OldDoFn<Iterable<ActualT>, Void> {
+  private static class SingletonCheckerDoFn<ActualT> extends DoFn<Iterable<ActualT>, Void> {
     private final SerializableFunction<ActualT, Void> checkerFn;
     private final Aggregator<Integer, Integer> success =
         createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
@@ -1079,7 +1078,7 @@ public class PAssert {
       this.checkerFn = checkerFn;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       try {
         ActualT actualContents = Iterables.getOnlyElement(c.element());
@@ -1310,7 +1309,7 @@ public class PAssert {
   }
 
   /**
-   * A OldDoFn that filters elements based on their presence in a static collection of windows.
+   * A DoFn that filters elements based on their presence in a static collection of windows.
    */
   private static final class FilterWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
     private final StaticWindows windows;
@@ -1324,10 +1323,10 @@ public class PAssert {
       return input.apply("FilterWindows", ParDo.of(new Fn()));
     }
 
-    private class Fn extends OldDoFn<T, T> implements RequiresWindowAccess {
-      @Override
-      public void processElement(ProcessContext c) throws Exception {
-        if (windows.getWindows().contains(c.window())) {
+    private class Fn extends DoFn<T, T> {
+      @ProcessElement
+      public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+        if (windows.getWindows().contains(window)) {
           c.output(c.element());
         }
       }


[5/9] incubator-beam git commit: Port mentions of OldDoFn in PipelineOptions

Posted by ke...@apache.org.
Port mentions of OldDoFn in PipelineOptions


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f5011e5c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f5011e5c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f5011e5c

Branch: refs/heads/master
Commit: f5011e5c62cb00fb4d8a91bd7d55d5083789a307
Parents: 620bd99
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Aug 3 19:56:33 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 14:56:42 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/options/PipelineOptions.java   | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5011e5c/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index 365f668..4595fc8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -22,8 +22,8 @@ import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer;
 import org.apache.beam.sdk.options.ProxyInvocationHandler.Deserializer;
 import org.apache.beam.sdk.options.ProxyInvocationHandler.Serializer;
 import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.Context;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Context;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 
 import com.google.auto.service.AutoService;
@@ -35,7 +35,6 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import java.lang.reflect.Proxy;
 import java.util.ServiceLoader;
-
 import javax.annotation.concurrent.ThreadSafe;
 
 /**
@@ -52,7 +51,7 @@ import javax.annotation.concurrent.ThreadSafe;
  * and {@link PipelineOptionsFactory#as(Class)}. They can be created
  * from command-line arguments with {@link PipelineOptionsFactory#fromArgs(String[])}.
  * They can be converted to another type by invoking {@link PipelineOptions#as(Class)} and
- * can be accessed from within a {@link OldDoFn} by invoking
+ * can be accessed from within a {@link DoFn} by invoking
  * {@link Context#getPipelineOptions()}.
  *
  * <p>For example:
@@ -151,7 +150,7 @@ import javax.annotation.concurrent.ThreadSafe;
  * {@link PipelineOptionsFactory#withValidation()} is invoked.
  *
  * <p>{@link JsonIgnore @JsonIgnore} is used to prevent a property from being serialized and
- * available during execution of {@link OldDoFn}. See the Serialization section below for more
+ * available during execution of {@link DoFn}. See the Serialization section below for more
  * details.
  *
  * <h2>Registration Of PipelineOptions</h2>


[3/9] incubator-beam git commit: Port join library to new DoFn

Posted by ke...@apache.org.
Port join library to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/620bd994
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/620bd994
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/620bd994

Branch: refs/heads/master
Commit: 620bd9949a6176ddd1903687fe9b8ba8c5822367
Parents: a1c06d7
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Aug 3 19:55:21 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 14:56:42 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/extensions/joinlibrary/Join.java  | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/620bd994/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
index 88836f9..f4e6ccb 100644
--- a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
+++ b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.joinlibrary;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.join.CoGbkResult;
 import org.apache.beam.sdk.transforms.join.CoGroupByKey;
@@ -59,8 +59,8 @@ public class Join {
         .apply(CoGroupByKey.<K>create());
 
     return coGbkResultCollection.apply(ParDo.of(
-      new OldDoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
-        @Override
+      new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+        @ProcessElement
         public void processElement(ProcessContext c) {
           KV<K, CoGbkResult> e = c.element();
 
@@ -108,8 +108,8 @@ public class Join {
         .apply(CoGroupByKey.<K>create());
 
     return coGbkResultCollection.apply(ParDo.of(
-      new OldDoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
-        @Override
+      new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+        @ProcessElement
         public void processElement(ProcessContext c) {
           KV<K, CoGbkResult> e = c.element();
 
@@ -161,8 +161,8 @@ public class Join {
         .apply(CoGroupByKey.<K>create());
 
     return coGbkResultCollection.apply(ParDo.of(
-      new OldDoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
-        @Override
+      new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+        @ProcessElement
         public void processElement(ProcessContext c) {
           KV<K, CoGbkResult> e = c.element();
 


[9/9] incubator-beam git commit: This closes #782

Posted by ke...@apache.org.
This closes #782


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8daf518b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8daf518b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8daf518b

Branch: refs/heads/master
Commit: 8daf518bccfe425082c7d0b3f31f3623ff67e000
Parents: fcf6b1d 47341e1
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Aug 4 20:10:55 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 20:10:55 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |  3 +-
 .../java/org/apache/beam/sdk/io/PubsubIO.java   | 14 +++----
 .../apache/beam/sdk/io/PubsubUnboundedSink.java | 17 ++++-----
 .../beam/sdk/io/PubsubUnboundedSource.java      |  7 ++--
 .../beam/sdk/options/PipelineOptions.java       |  9 ++---
 .../org/apache/beam/sdk/testing/PAssert.java    | 39 ++++++++++----------
 .../org/apache/beam/sdk/transforms/Count.java   |  4 +-
 .../org/apache/beam/sdk/transforms/Create.java  |  4 +-
 .../beam/sdk/transforms/DoFnReflector.java      |  6 +++
 .../beam/sdk/transforms/FlatMapElements.java    |  4 +-
 .../org/apache/beam/sdk/transforms/Flatten.java |  4 +-
 .../org/apache/beam/sdk/transforms/Keys.java    |  4 +-
 .../org/apache/beam/sdk/transforms/KvSwap.java  |  4 +-
 .../apache/beam/sdk/transforms/MapElements.java |  4 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |  2 +-
 .../apache/beam/sdk/transforms/Partition.java   |  4 +-
 .../beam/sdk/transforms/RemoveDuplicates.java   |  4 +-
 .../org/apache/beam/sdk/transforms/Sample.java  |  6 +--
 .../org/apache/beam/sdk/transforms/Values.java  |  4 +-
 .../org/apache/beam/sdk/transforms/View.java    |  8 ++--
 .../apache/beam/sdk/transforms/WithKeys.java    |  4 +-
 .../beam/sdk/transforms/WithTimestamps.java     |  6 +--
 .../beam/sdk/transforms/join/CoGroupByKey.java  | 16 ++++----
 .../java/org/apache/beam/sdk/PipelineTest.java  |  8 ++--
 .../apache/beam/sdk/coders/AvroCoderTest.java   |  6 +--
 .../beam/sdk/coders/CoderRegistryTest.java      | 10 ++---
 .../beam/sdk/coders/SerializableCoderTest.java  | 10 ++---
 .../apache/beam/sdk/io/CountingInputTest.java   |  6 +--
 .../apache/beam/sdk/io/CountingSourceTest.java  |  6 +--
 .../beam/sdk/io/PubsubUnboundedSinkTest.java    |  6 +--
 .../sdk/transforms/ApproximateUniqueTest.java   |  6 +--
 .../beam/sdk/transforms/CombineFnsTest.java     |  4 +-
 .../apache/beam/sdk/transforms/CombineTest.java | 18 ++++-----
 .../apache/beam/sdk/transforms/CreateTest.java  |  4 +-
 .../apache/beam/sdk/transforms/FlattenTest.java |  8 ++--
 .../beam/sdk/transforms/GroupByKeyTest.java     |  8 ++--
 .../beam/sdk/transforms/WithTimestampsTest.java | 12 +++---
 .../display/DisplayDataEvaluatorTest.java       | 10 ++---
 .../sdk/transforms/display/DisplayDataTest.java |  6 +--
 .../sdk/transforms/join/CoGroupByKeyTest.java   | 34 ++++++++---------
 .../sdk/transforms/windowing/WindowTest.java    | 10 ++---
 .../sdk/transforms/windowing/WindowingTest.java | 23 ++++++------
 .../beam/sdk/values/PCollectionTupleTest.java   |  6 +--
 .../apache/beam/sdk/values/TypedPValueTest.java | 10 ++---
 .../beam/sdk/extensions/joinlibrary/Join.java   | 14 +++----
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 36 +++++++++---------
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    | 12 +++---
 .../beam/sdk/io/gcp/datastore/V1Beta3.java      | 18 ++++-----
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 10 ++---
 .../sdk/io/gcp/bigtable/BigtableWriteIT.java    |  6 +--
 .../sdk/io/gcp/datastore/V1Beta3TestUtil.java   |  9 ++---
 .../java/org/apache/beam/sdk/io/jms/JmsIO.java  | 10 ++---
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 19 +++++-----
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 10 ++---
 54 files changed, 265 insertions(+), 267 deletions(-)
----------------------------------------------------------------------



[2/9] incubator-beam git commit: Port easy I/O transforms to new DoFn

Posted by ke...@apache.org.
Port easy I/O transforms to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/269fbf38
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/269fbf38
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/269fbf38

Branch: refs/heads/master
Commit: 269fbf386454ea77845e54764a125edba7039b03
Parents: ef5e31f
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Aug 3 20:22:26 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 14:56:42 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |  3 +-
 .../java/org/apache/beam/sdk/io/PubsubIO.java   | 14 ++++----
 .../apache/beam/sdk/io/PubsubUnboundedSink.java | 17 +++++----
 .../beam/sdk/io/PubsubUnboundedSource.java      |  7 ++--
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 36 +++++++++-----------
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    | 12 +++----
 .../beam/sdk/io/gcp/datastore/V1Beta3.java      | 18 +++++-----
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 10 +++---
 .../sdk/io/gcp/bigtable/BigtableWriteIT.java    |  6 ++--
 .../sdk/io/gcp/datastore/V1Beta3TestUtil.java   |  9 +++--
 .../java/org/apache/beam/sdk/io/jms/JmsIO.java  | 10 +++---
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 19 +++++------
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 10 +++---
 13 files changed, 82 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index abcf415..fadd9c7 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -78,6 +78,7 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.OldDoFn;
@@ -2715,7 +2716,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     @Nullable
     private PTransform<?, ?> transform;
     @Nullable
-    private OldDoFn<?, ?> doFn;
+    private DoFn<?, ?> doFn;
 
     /**
      * Builds an instance of this class from the overridden transform.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 1902bca..2b27175 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -25,7 +25,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -709,11 +709,11 @@ public class PubsubIO {
        *
        * <p>Public so can be suppressed by runners.
        */
-      public class PubsubBoundedReader extends OldDoFn<Void, T> {
+      public class PubsubBoundedReader extends DoFn<Void, T> {
         private static final int DEFAULT_PULL_SIZE = 100;
         private static final int ACK_TIMEOUT_SEC = 60;
 
-        @Override
+        @ProcessElement
         public void processElement(ProcessContext c) throws IOException {
           try (PubsubClient pubsubClient =
                    FACTORY.newClient(timestampLabel, idLabel,
@@ -998,12 +998,12 @@ public class PubsubIO {
        *
        * <p>Public so can be suppressed by runners.
        */
-      public class PubsubBoundedWriter extends OldDoFn<T, Void> {
+      public class PubsubBoundedWriter extends DoFn<T, Void> {
         private static final int MAX_PUBLISH_BATCH_SIZE = 100;
         private transient List<OutgoingMessage> output;
         private transient PubsubClient pubsubClient;
 
-        @Override
+        @StartBundle
         public void startBundle(Context c) throws IOException {
           this.output = new ArrayList<>();
           // NOTE: idLabel is ignored.
@@ -1012,7 +1012,7 @@ public class PubsubIO {
                                 c.getPipelineOptions().as(PubsubOptions.class));
         }
 
-        @Override
+        @ProcessElement
         public void processElement(ProcessContext c) throws IOException {
           // NOTE: The record id is always null.
           OutgoingMessage message =
@@ -1025,7 +1025,7 @@ public class PubsubIO {
           }
         }
 
-        @Override
+        @FinishBundle
         public void finishBundle(Context c) throws IOException {
           if (!output.isEmpty()) {
             publish();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index 9e9536d..3014751 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -31,8 +31,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
@@ -65,7 +65,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
-
 import javax.annotation.Nullable;
 
 /**
@@ -78,7 +77,7 @@ import javax.annotation.Nullable;
  * <li>We try to send messages in batches while also limiting send latency.
  * <li>No stats are logged. Rather some counters are used to keep track of elements and batches.
  * <li>Though some background threads are used by the underlying netty system all actual Pubsub
- * calls are blocking. We rely on the underlying runner to allow multiple {@link OldDoFn} instances
+ * calls are blocking. We rely on the underlying runner to allow multiple {@link DoFn} instances
  * to execute concurrently and hide latency.
  * <li>A failed bundle will cause messages to be resent. Thus we rely on the Pubsub consumer
  * to dedup messages.
@@ -155,7 +154,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
   /**
    * Convert elements to messages and shard them.
    */
-  private static class ShardFn<T> extends OldDoFn<T, KV<Integer, OutgoingMessage>> {
+  private static class ShardFn<T> extends DoFn<T, KV<Integer, OutgoingMessage>> {
     private final Aggregator<Long, Long> elementCounter =
         createAggregator("elements", new Sum.SumLongFn());
     private final Coder<T> elementCoder;
@@ -168,7 +167,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
       this.recordIdMethod = recordIdMethod;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       elementCounter.addValue(1L);
       byte[] elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element());
@@ -207,7 +206,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
    * Publish messages to Pubsub in batches.
    */
   private static class WriterFn
-      extends OldDoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> {
+      extends DoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> {
     private final PubsubClientFactory pubsubFactory;
     private final TopicPath topic;
     private final String timestampLabel;
@@ -253,14 +252,14 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
       byteCounter.addValue((long) bytes);
     }
 
-    @Override
+    @StartBundle
     public void startBundle(Context c) throws Exception {
       checkState(pubsubClient == null, "startBundle invoked without prior finishBundle");
       pubsubClient = pubsubFactory.newClient(timestampLabel, idLabel,
                                              c.getPipelineOptions().as(PubsubOptions.class));
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       List<OutgoingMessage> pubsubMessages = new ArrayList<>(publishBatchSize);
       int bytes = 0;
@@ -285,7 +284,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
       }
     }
 
-    @Override
+    @FinishBundle
     public void finishBundle(Context c) throws Exception {
       pubsubClient.close();
       pubsubClient = null;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index d98bd6a..f99b471 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -31,7 +31,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
@@ -77,7 +77,6 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import javax.annotation.Nullable;
 
 /**
@@ -1107,7 +1106,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
   // StatsFn
   // ================================================================================
 
-  private static class StatsFn<T> extends OldDoFn<T, T> {
+  private static class StatsFn<T> extends DoFn<T, T> {
     private final Aggregator<Long, Long> elementCounter =
         createAggregator("elements", new Sum.SumLongFn());
 
@@ -1131,7 +1130,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
       this.idLabel = idLabel;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       elementCounter.addValue(1L);
       c.output(c.element());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 2ba7562..ed2c32e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -103,7 +104,6 @@ import com.google.common.io.CountingOutputStream;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-
 import org.apache.avro.generic.GenericRecord;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -135,7 +135,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
 import javax.annotation.Nullable;
 
 /**
@@ -334,7 +333,7 @@ public class BigQueryIO {
    * <p>Each {@link TableRow} contains values indexed by column name. Here is a
    * sample processing function that processes a "line" column from rows:
    * <pre>{@code
-   * static class ExtractWordsFn extends OldDoFn<TableRow, String> {
+   * static class ExtractWordsFn extends DoFn<TableRow, String> {
    *   public void processElement(ProcessContext c) {
    *     // Get the "line" field of the TableRow object, split it into words, and emit them.
    *     TableRow row = c.element();
@@ -706,8 +705,8 @@ public class BigQueryIO {
       input.getPipeline()
           .apply("Create(CleanupOperation)", Create.of(cleanupOperation))
           .apply("Cleanup", ParDo.of(
-              new OldDoFn<CleanupOperation, Void>() {
-                @Override
+              new DoFn<CleanupOperation, Void>() {
+                @ProcessElement
                 public void processElement(ProcessContext c)
                     throws Exception {
                   c.element().cleanup(c.getPipelineOptions());
@@ -717,8 +716,8 @@ public class BigQueryIO {
       return outputs.get(mainOutput);
     }
 
-    private static class IdentityFn<T> extends OldDoFn<T, T> {
-      @Override
+    private static class IdentityFn<T> extends DoFn<T, T> {
+      @ProcessElement
       public void processElement(ProcessContext c) {
         c.output(c.element());
       }
@@ -1271,7 +1270,7 @@ public class BigQueryIO {
    * <p>Here is a sample transform that produces TableRow values containing
    * "word" and "count" columns:
    * <pre>{@code
-   * static class FormatCountsFn extends OldDoFn<KV<String, Long>, TableRow> {
+   * static class FormatCountsFn extends DoFn<KV<String, Long>, TableRow> {
    *   public void processElement(ProcessContext c) {
    *     TableRow row = new TableRow()
    *         .set("word", c.element().getKey())
@@ -2307,11 +2306,11 @@ public class BigQueryIO {
   /////////////////////////////////////////////////////////////////////////////
 
   /**
-   * Implementation of OldDoFn to perform streaming BigQuery write.
+   * Implementation of DoFn to perform streaming BigQuery write.
    */
   @SystemDoFnInternal
   private static class StreamingWriteFn
-      extends OldDoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
+      extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
     /** TableSchema in JSON. Use String to make the class Serializable. */
     private final String jsonTableSchema;
 
@@ -2339,14 +2338,14 @@ public class BigQueryIO {
     }
 
     /** Prepares a target BigQuery table. */
-    @Override
+    @StartBundle
     public void startBundle(Context context) {
       tableRows = new HashMap<>();
       uniqueIdsForTableRows = new HashMap<>();
     }
 
     /** Accumulates the input into JsonTableRows and uniqueIdsForTableRows. */
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext context) {
       String tableSpec = context.element().getKey().getKey();
       List<TableRow> rows = getOrCreateMapListValue(tableRows, tableSpec);
@@ -2357,7 +2356,7 @@ public class BigQueryIO {
     }
 
     /** Writes the accumulated rows into BigQuery with streaming API. */
-    @Override
+    @FinishBundle
     public void finishBundle(Context context) throws Exception {
       BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
 
@@ -2544,8 +2543,7 @@ public class BigQueryIO {
    * id is created by concatenating this randomUUID with a sequential number.
    */
   private static class TagWithUniqueIdsAndTable
-      extends OldDoFn<TableRow, KV<ShardedKey<String>, TableRowInfo>>
-      implements OldDoFn.RequiresWindowAccess {
+      extends DoFn<TableRow, KV<ShardedKey<String>, TableRowInfo>> {
     /** TableSpec to write to. */
     private final String tableSpec;
 
@@ -2571,18 +2569,18 @@ public class BigQueryIO {
     }
 
 
-    @Override
+    @StartBundle
     public void startBundle(Context context) {
       randomUUID = UUID.randomUUID().toString();
     }
 
     /** Tag the input with a unique id. */
-    @Override
-    public void processElement(ProcessContext context) throws IOException {
+    @ProcessElement
+    public void processElement(ProcessContext context, BoundedWindow window) throws IOException {
       String uniqueId = randomUUID + sequenceNo++;
       ThreadLocalRandom randomGenerator = ThreadLocalRandom.current();
       String tableSpec = tableSpecFromWindow(
-          context.getPipelineOptions().as(BigQueryOptions.class), context.window());
+          context.getPipelineOptions().as(BigQueryOptions.class), window);
       // We output on keys 0-50 to ensure that there's enough batching for
       // BigQuery.
       context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, 50)),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 1f77e3e..bfdf4aa 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -31,7 +31,7 @@ import org.apache.beam.sdk.io.range.ByteKeyRange;
 import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -55,7 +55,6 @@ import com.google.common.util.concurrent.Futures;
 import com.google.protobuf.ByteString;
 
 import io.grpc.Status;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +64,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentLinkedQueue;
-
 import javax.annotation.Nullable;
 
 /**
@@ -512,7 +510,7 @@ public class BigtableIO {
       return new BigtableServiceImpl(options);
     }
 
-    private class BigtableWriterFn extends OldDoFn<KV<ByteString, Iterable<Mutation>>, Void> {
+    private class BigtableWriterFn extends DoFn<KV<ByteString, Iterable<Mutation>>, Void> {
 
       public BigtableWriterFn(String tableId, BigtableService bigtableService) {
         this.tableId = checkNotNull(tableId, "tableId");
@@ -520,13 +518,13 @@ public class BigtableIO {
         this.failures = new ConcurrentLinkedQueue<>();
       }
 
-      @Override
+      @StartBundle
       public void startBundle(Context c) throws Exception {
         bigtableWriter = bigtableService.openForWriting(tableId);
         recordsWritten = 0;
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         checkForFailures();
         Futures.addCallback(
@@ -534,7 +532,7 @@ public class BigtableIO {
         ++recordsWritten;
       }
 
-      @Override
+      @FinishBundle
       public void finishBundle(Context c) throws Exception {
         bigtableWriter.close();
         bigtableWriter = null;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
index 6f3663a..052feb3 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
@@ -37,9 +37,9 @@ import org.apache.beam.sdk.io.Sink.Writer;
 import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Values;
@@ -478,11 +478,11 @@ public class V1Beta3 {
     }
 
     /**
-     * A {@link OldDoFn} that splits a given query into multiple sub-queries, assigns them unique
+     * A {@link DoFn} that splits a given query into multiple sub-queries, assigns them unique
      * keys and outputs them as {@link KV}.
      */
     @VisibleForTesting
-    static class SplitQueryFn extends OldDoFn<Query, KV<Integer, Query>> {
+    static class SplitQueryFn extends DoFn<Query, KV<Integer, Query>> {
       private final V1Beta3Options options;
       // number of splits to make for a given query
       private final int numSplits;
@@ -505,13 +505,13 @@ public class V1Beta3 {
         this.datastoreFactory = datastoreFactory;
       }
 
-      @Override
+      @StartBundle
       public void startBundle(Context c) throws Exception {
         datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.projectId);
         querySplitter = datastoreFactory.getQuerySplitter();
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         int key = 1;
         Query query = c.element();
@@ -559,10 +559,10 @@ public class V1Beta3 {
     }
 
     /**
-     * A {@link OldDoFn} that reads entities from Datastore for each query.
+     * A {@link DoFn} that reads entities from Datastore for each query.
      */
     @VisibleForTesting
-    static class ReadFn extends OldDoFn<Query, Entity> {
+    static class ReadFn extends DoFn<Query, Entity> {
       private final V1Beta3Options options;
       private final V1Beta3DatastoreFactory datastoreFactory;
       // Datastore client
@@ -578,13 +578,13 @@ public class V1Beta3 {
         this.datastoreFactory = datastoreFactory;
       }
 
-      @Override
+      @StartBundle
       public void startBundle(Context c) throws Exception {
         datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId());
       }
 
       /** Read and output entities for the given query. */
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext context) throws Exception {
         Query query = context.element();
         String namespace = options.getNamespace();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 1ea1f94..6d6eb60 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -22,6 +22,7 @@ import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.toJsonString;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 
 import static com.google.common.base.Preconditions.checkArgument;
+
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -64,8 +65,8 @@ import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -131,7 +132,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
-
 import javax.annotation.Nullable;
 
 /**
@@ -235,7 +235,7 @@ public class BigQueryIOTest implements Serializable {
     private Object[] pollJobReturns;
     private String executingProject;
     // Both counts will be reset back to zeros after serialization.
-    // This is a work around for OldDoFn's verifyUnmodified check.
+    // This is a work around for DoFn's verifyUnmodified check.
     private transient int startJobCallsCount;
     private transient int pollJobStatusCallsCount;
 
@@ -571,8 +571,8 @@ public class BigQueryIOTest implements Serializable {
         .apply(BigQueryIO.Read.from("non-executing-project:somedataset.sometable")
             .withTestServices(fakeBqServices)
             .withoutValidation())
-        .apply(ParDo.of(new OldDoFn<TableRow, String>() {
-          @Override
+        .apply(ParDo.of(new DoFn<TableRow, String>() {
+          @ProcessElement
           public void processElement(ProcessContext c) throws Exception {
             c.output((String) c.element().get("name"));
           }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
index 83489a5..ee3a6f9 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
@@ -23,7 +23,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
 
@@ -108,8 +108,8 @@ public class BigtableWriteIT implements Serializable {
 
     Pipeline p = Pipeline.create(options);
     p.apply(CountingInput.upTo(numRows))
-        .apply(ParDo.of(new OldDoFn<Long, KV<ByteString, Iterable<Mutation>>>() {
-          @Override
+        .apply(ParDo.of(new DoFn<Long, KV<ByteString, Iterable<Mutation>>>() {
+          @ProcessElement
           public void processElement(ProcessContext c) {
             int index = c.element().intValue();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
index daed1cb..7eaf23e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
@@ -27,7 +27,7 @@ import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
 
 import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
 import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
 
@@ -60,7 +60,6 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
-
 import javax.annotation.Nullable;
 
 class V1Beta3TestUtil {
@@ -109,9 +108,9 @@ class V1Beta3TestUtil {
   }
 
   /**
-   * A OldDoFn that creates entity for a long number.
+   * A DoFn that creates entity for a long number.
    */
-  static class CreateEntityFn extends OldDoFn<Long, Entity> {
+  static class CreateEntityFn extends DoFn<Long, Entity> {
     private final String kind;
     @Nullable
     private final String namespace;
@@ -124,7 +123,7 @@ class V1Beta3TestUtil {
       ancestorKey = makeAncestorKey(namespace, kind, ancestor);
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       c.output(makeEntity(c.element(), ancestorKey, kind, namespace));
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index eeb02e6..557fe13 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -453,7 +453,7 @@ public class JmsIO {
       checkArgument((queue != null || topic != null), "Either queue or topic is required");
     }
 
-    private static class JmsWriter extends OldDoFn<String, Void> {
+    private static class JmsWriter extends DoFn<String, Void> {
 
       private ConnectionFactory connectionFactory;
       private String queue;
@@ -469,7 +469,7 @@ public class JmsIO {
         this.topic = topic;
       }
 
-      @Override
+      @StartBundle
       public void startBundle(Context c) throws Exception {
         if (producer == null) {
           this.connection = connectionFactory.createConnection();
@@ -486,7 +486,7 @@ public class JmsIO {
         }
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext ctx) throws Exception {
         String value = ctx.element();
 
@@ -499,7 +499,7 @@ public class JmsIO {
         }
       }
 
-      @Override
+      @FinishBundle
       public void finishBundle(Context c) throws Exception {
         producer.close();
         producer = null;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 2271216..2383105 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -33,7 +33,7 @@ import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -94,7 +94,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import javax.annotation.Nullable;
 
 /**
@@ -550,8 +549,8 @@ public class KafkaIO {
       return typedRead
           .apply(begin)
           .apply("Remove Kafka Metadata",
-              ParDo.of(new OldDoFn<KafkaRecord<K, V>, KV<K, V>>() {
-                @Override
+              ParDo.of(new DoFn<KafkaRecord<K, V>, KV<K, V>>() {
+                @ProcessElement
                 public void processElement(ProcessContext ctx) {
                   ctx.output(ctx.element().getKV());
                 }
@@ -1315,8 +1314,8 @@ public class KafkaIO {
     public PDone apply(PCollection<V> input) {
       return input
         .apply("Kafka values with default key",
-          ParDo.of(new OldDoFn<V, KV<Void, V>>() {
-            @Override
+          ParDo.of(new DoFn<V, KV<Void, V>>() {
+            @ProcessElement
             public void processElement(ProcessContext ctx) throws Exception {
               ctx.output(KV.<Void, V>of(null, ctx.element()));
             }
@@ -1326,9 +1325,9 @@ public class KafkaIO {
     }
   }
 
-  private static class KafkaWriter<K, V> extends OldDoFn<KV<K, V>, Void> {
+  private static class KafkaWriter<K, V> extends DoFn<KV<K, V>, Void> {
 
-    @Override
+    @StartBundle
     public void startBundle(Context c) throws Exception {
       // Producer initialization is fairly costly. Move this to future initialization api to avoid
       // creating a producer for each bundle.
@@ -1341,7 +1340,7 @@ public class KafkaIO {
       }
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext ctx) throws Exception {
       checkForFailures();
 
@@ -1351,7 +1350,7 @@ public class KafkaIO {
           new SendCallback());
     }
 
-    @Override
+    @FinishBundle
     public void finishBundle(Context c) throws Exception {
       producer.flush();
       producer.close();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index d7b1921..9a89c36 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -33,10 +33,10 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.RemoveDuplicates;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -280,8 +280,8 @@ public class KafkaIOTest {
     p.run();
   }
 
-  private static class ElementValueDiff extends OldDoFn<Long, Long> {
-    @Override
+  private static class ElementValueDiff extends DoFn<Long, Long> {
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       c.output(c.element() - c.timestamp().getMillis());
     }
@@ -308,8 +308,8 @@ public class KafkaIOTest {
     p.run();
   }
 
-  private static class RemoveKafkaMetadata<K, V> extends OldDoFn<KafkaRecord<K, V>, KV<K, V>> {
-    @Override
+  private static class RemoveKafkaMetadata<K, V> extends DoFn<KafkaRecord<K, V>, KV<K, V>> {
+    @ProcessElement
     public void processElement(ProcessContext ctx) throws Exception {
       ctx.output(ctx.element().getKV());
     }



[7/9] incubator-beam git commit: Propagate getAllowedTimestampSkew from DoFn to its adapter

Posted by ke...@apache.org.
Propagate getAllowedTimestampSkew from DoFn to its adapter


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1c06d71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1c06d71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1c06d71

Branch: refs/heads/master
Commit: a1c06d71876384722982ec24da1607e41af653d9
Parents: 0b18652
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Aug 4 14:56:19 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 14:56:42 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/transforms/DoFnReflector.java     | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1c06d71/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
index 9bdfde8..c6168b3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
@@ -72,6 +72,7 @@ import net.bytebuddy.jar.asm.Label;
 import net.bytebuddy.jar.asm.MethodVisitor;
 import net.bytebuddy.jar.asm.Opcodes;
 import net.bytebuddy.matcher.ElementMatchers;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 
 import java.io.IOException;
@@ -731,6 +732,11 @@ public abstract class DoFnReflector {
     }
 
     @Override
+    public Duration getAllowedTimestampSkew() {
+      return fn.getAllowedTimestampSkew();
+    }
+
+    @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       builder.include(fn);
     }


[8/9] incubator-beam git commit: Port easy Java SDK tests to new DoFn

Posted by ke...@apache.org.
Port easy Java SDK tests to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1959ddbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1959ddbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1959ddbe

Branch: refs/heads/master
Commit: 1959ddbedb2ad61824bf28e1e9139cc677a2aaf5
Parents: f5011e5
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Aug 3 20:15:12 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 14:56:42 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/PipelineTest.java  |  8 ++---
 .../apache/beam/sdk/coders/AvroCoderTest.java   |  6 ++--
 .../beam/sdk/coders/CoderRegistryTest.java      | 10 +++---
 .../beam/sdk/coders/SerializableCoderTest.java  | 10 +++---
 .../apache/beam/sdk/io/CountingInputTest.java   |  6 ++--
 .../apache/beam/sdk/io/CountingSourceTest.java  |  6 ++--
 .../beam/sdk/io/PubsubUnboundedSinkTest.java    |  6 ++--
 .../sdk/transforms/ApproximateUniqueTest.java   |  6 ++--
 .../beam/sdk/transforms/CombineFnsTest.java     |  4 +--
 .../apache/beam/sdk/transforms/CombineTest.java | 18 +++++------
 .../apache/beam/sdk/transforms/CreateTest.java  |  4 +--
 .../apache/beam/sdk/transforms/FlattenTest.java |  8 ++---
 .../beam/sdk/transforms/GroupByKeyTest.java     |  8 ++---
 .../beam/sdk/transforms/WithTimestampsTest.java | 12 +++----
 .../display/DisplayDataEvaluatorTest.java       | 10 +++---
 .../sdk/transforms/display/DisplayDataTest.java |  6 ++--
 .../sdk/transforms/join/CoGroupByKeyTest.java   | 34 ++++++++++----------
 .../sdk/transforms/windowing/WindowTest.java    | 10 +++---
 .../sdk/transforms/windowing/WindowingTest.java | 23 ++++++-------
 .../beam/sdk/values/PCollectionTupleTest.java   |  6 ++--
 .../apache/beam/sdk/values/TypedPValueTest.java | 10 +++---
 21 files changed, 106 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index 5137031..8b86499 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -36,8 +36,8 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.UserCodeException;
@@ -146,9 +146,9 @@ public class PipelineTest {
 
   private static PTransform<PCollection<? extends String>, PCollection<String>> addSuffix(
       final String suffix) {
-    return ParDo.of(new OldDoFn<String, String>() {
-      @Override
-      public void processElement(OldDoFn<String, String>.ProcessContext c) {
+    return ParDo.of(new DoFn<String, String>() {
+      @ProcessElement
+      public void processElement(DoFn<String, String>.ProcessContext c) {
         c.output(c.element() + suffix);
       }
     });

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
index 41d0932..3b13e35 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
@@ -31,7 +31,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.SerializableUtils;
@@ -134,8 +134,8 @@ public class AvroCoderTest {
     }
   }
 
-  private static class GetTextFn extends OldDoFn<Pojo, String> {
-    @Override
+  private static class GetTextFn extends DoFn<Pojo, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(c.element().text);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index 35ec6c6..da15405 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.CloudObject;
@@ -366,8 +366,8 @@ public class CoderRegistryTest {
   private static class PTransformOutputingMySerializableGeneric
   extends PTransform<PCollection<String>, PCollection<KV<String, MySerializableGeneric<String>>>> {
 
-    private class OutputDoFn extends OldDoFn<String, KV<String, MySerializableGeneric<String>>> {
-      @Override
+    private class OutputDoFn extends DoFn<String, KV<String, MySerializableGeneric<String>>> {
+      @ProcessElement
       public void processElement(ProcessContext c) { }
     }
 
@@ -430,8 +430,8 @@ public class CoderRegistryTest {
       PCollection<String>,
       PCollection<KV<String, MySerializableGeneric<T>>>> {
 
-    private class OutputDoFn extends OldDoFn<String, KV<String, MySerializableGeneric<T>>> {
-      @Override
+    private class OutputDoFn extends DoFn<String, KV<String, MySerializableGeneric<T>>> {
+      @ProcessElement
       public void processElement(ProcessContext c) { }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
index 3e7fd50..b5465fa 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -82,15 +82,15 @@ public class SerializableCoderTest implements Serializable {
     }
   }
 
-  static class StringToRecord extends OldDoFn<String, MyRecord> {
-    @Override
+  static class StringToRecord extends DoFn<String, MyRecord> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(new MyRecord(c.element()));
     }
   }
 
-  static class RecordToString extends OldDoFn<MyRecord, String> {
-    @Override
+  static class RecordToString extends DoFn<MyRecord, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(c.element().value);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
index 95f7454..4ec2c9a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
@@ -29,9 +29,9 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.RemoveDuplicates;
@@ -120,8 +120,8 @@ public class CountingInputTest {
     assertThat(endTime.isAfter(startTime.plus(expectedRuntimeMillis)), is(true));
   }
 
-  private static class ElementValueDiff extends OldDoFn<Long, Long> {
-    @Override
+  private static class ElementValueDiff extends DoFn<Long, Long> {
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       c.output(c.element() - c.timestamp().getMillis());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
index 45f636f..0bd91c1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
@@ -34,10 +34,10 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.RemoveDuplicates;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -159,8 +159,8 @@ public class CountingSourceTest {
     p.run();
   }
 
-  private static class ElementValueDiff extends OldDoFn<Long, Long> {
-    @Override
+  private static class ElementValueDiff extends DoFn<Long, Long> {
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       c.output(c.element() - c.timestamp().getMillis());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
index f8592c9..db03a5c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.PubsubClient;
 import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
@@ -58,8 +58,8 @@ public class PubsubUnboundedSinkTest {
   private static final String ID_LABEL = "id";
   private static final int NUM_SHARDS = 10;
 
-  private static class Stamp extends OldDoFn<String, String> {
-    @Override
+  private static class Stamp extends DoFn<String, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       c.outputWithTimestamp(c.element(), new Instant(TIMESTAMP));
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
index 5c8732f..7b6d671 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
@@ -54,7 +54,7 @@ import java.util.List;
  */
 @RunWith(JUnit4.class)
 public class ApproximateUniqueTest implements Serializable {
-  // implements Serializable just to make it easy to use anonymous inner OldDoFn subclasses
+  // implements Serializable just to make it easy to use anonymous inner DoFn subclasses
 
   @Test
   public void testEstimationErrorToSampleSize() {
@@ -223,8 +223,8 @@ public class ApproximateUniqueTest implements Serializable {
             .apply(View.<Long>asSingleton());
 
     PCollection<KV<Long, Long>> approximateAndExact = approximate
-        .apply(ParDo.of(new OldDoFn<Long, KV<Long, Long>>() {
-              @Override
+        .apply(ParDo.of(new DoFn<Long, KV<Long, Long>>() {
+              @ProcessElement
               public void processElement(ProcessContext c) {
                 c.output(KV.of(c.element(), c.sideInput(exact)));
               }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index d6bf826..95ba1aa 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -461,7 +461,7 @@ public class  CombineFnsTest {
   }
 
   private static class ExtractResultDoFn
-      extends OldDoFn<KV<String, CoCombineResult>, KV<String, KV<Integer, String>>> {
+      extends DoFn<KV<String, CoCombineResult>, KV<String, KV<Integer, String>>> {
 
     private final TupleTag<Integer> maxIntTag;
     private final TupleTag<UserString> concatStringTag;
@@ -471,7 +471,7 @@ public class  CombineFnsTest {
       this.concatStringTag = concatStringTag;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       UserString userString = c.element().getValue().get(concatStringTag);
       KV<Integer, String> value = KV.of(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index cb9928e..6421b3b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -117,7 +117,7 @@ public class CombineTest implements Serializable {
     1, 1, 2, 3, 5, 8, 13, 21, 34, 55
   };
 
-  @Mock private OldDoFn<?, ?>.ProcessContext processContext;
+  @Mock private DoFn<?, ?>.ProcessContext processContext;
 
   PCollection<KV<String, Integer>> createInput(Pipeline p,
                                                KV<String, Integer>[] table) {
@@ -372,8 +372,8 @@ public class CombineTest implements Serializable {
     pipeline.run();
   }
 
-  private static class FormatPaneInfo extends OldDoFn<Integer, String> {
-    @Override
+  private static class FormatPaneInfo extends DoFn<Integer, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(c.element() + ": " + c.pane().isLast());
     }
@@ -560,8 +560,8 @@ public class CombineTest implements Serializable {
     pipeline.run();
   }
 
-  private static class GetLast extends OldDoFn<Integer, Integer> {
-    @Override
+  private static class GetLast extends DoFn<Integer, Integer> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       if (c.pane().isLast()) {
         c.output(c.element());
@@ -653,8 +653,8 @@ public class CombineTest implements Serializable {
 
     PCollection<Integer> output = pipeline
         .apply("CreateVoidMainInput", Create.of((Void) null))
-        .apply("OutputSideInput", ParDo.of(new OldDoFn<Void, Integer>() {
-                  @Override
+        .apply("OutputSideInput", ParDo.of(new DoFn<Void, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(c.sideInput(view));
                   }
@@ -1176,8 +1176,8 @@ public class CombineTest implements Serializable {
   }
 
   private static <T> PCollection<T> copy(PCollection<T> pc, final int n) {
-    return pc.apply(ParDo.of(new OldDoFn<T, T>() {
-      @Override
+    return pc.apply(ParDo.of(new DoFn<T, T>() {
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         for (int i = 0; i < n; i++) {
           c.output(c.element());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index cf65423..9db0136 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -229,8 +229,8 @@ public class CreateTest {
     p.run();
   }
 
-  private static class PrintTimestamps extends OldDoFn<String, String> {
-    @Override
+  private static class PrintTimestamps extends DoFn<String, String> {
+    @ProcessElement
       public void processElement(ProcessContext c) {
       c.output(c.element() + ":" + c.timestamp().getMillis());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index b81eedb..604536b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -130,8 +130,8 @@ public class FlattenTest implements Serializable {
 
     PCollection<String> output = p
         .apply(Create.of((Void) null).withCoder(VoidCoder.of()))
-        .apply(ParDo.withSideInputs(view).of(new OldDoFn<Void, String>() {
-                  @Override
+        .apply(ParDo.withSideInputs(view).of(new DoFn<Void, String>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (String side : c.sideInput(view)) {
                       c.output(side);
@@ -339,8 +339,8 @@ public class FlattenTest implements Serializable {
 
   /////////////////////////////////////////////////////////////////////////////
 
-  private static class IdentityFn<T> extends OldDoFn<T, T> {
-    @Override
+  private static class IdentityFn<T> extends DoFn<T, T> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(c.element());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 15c3ba8..afe460f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -371,14 +371,14 @@ public class GroupByKeyTest {
     pipeline.run();
   }
 
-  private static class AssertTimestamp<K, V> extends OldDoFn<KV<K, V>, Void> {
+  private static class AssertTimestamp<K, V> extends DoFn<KV<K, V>, Void> {
     private final Instant timestamp;
 
     public AssertTimestamp(Instant timestamp) {
       this.timestamp = timestamp;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       assertThat(c.timestamp(), equalTo(timestamp));
     }
@@ -506,9 +506,9 @@ public class GroupByKeyTest {
    * Creates a KV that wraps the original KV together with a random key.
    */
   static class AssignRandomKey
-      extends OldDoFn<KV<BadEqualityKey, Long>, KV<Long, KV<BadEqualityKey, Long>>> {
+      extends DoFn<KV<BadEqualityKey, Long>, KV<Long, KV<BadEqualityKey, Long>>> {
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       c.output(KV.of(ThreadLocalRandom.current().nextLong(), c.element()));
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
index d2ba452..e381470 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
@@ -65,9 +65,9 @@ public class WithTimestampsTest implements Serializable {
          .apply(WithTimestamps.of(timestampFn));
 
     PCollection<KV<String, Instant>> timestampedVals =
-        timestamped.apply(ParDo.of(new OldDoFn<String, KV<String, Instant>>() {
-          @Override
-          public void processElement(OldDoFn<String, KV<String, Instant>>.ProcessContext c)
+        timestamped.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() {
+          @ProcessElement
+          public void processElement(DoFn<String, KV<String, Instant>>.ProcessContext c)
               throws Exception {
             c.output(KV.of(c.element(), c.timestamp()));
           }
@@ -150,9 +150,9 @@ public class WithTimestampsTest implements Serializable {
              WithTimestamps.of(backInTimeFn).withAllowedTimestampSkew(skew.plus(100L)));
 
     PCollection<KV<String, Instant>> timestampedVals =
-        timestampedWithSkew.apply(ParDo.of(new OldDoFn<String, KV<String, Instant>>() {
-          @Override
-          public void processElement(OldDoFn<String, KV<String, Instant>>.ProcessContext c)
+        timestampedWithSkew.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() {
+          @ProcessElement
+          public void processElement(DoFn<String, KV<String, Instant>>.ProcessContext c)
               throws Exception {
             c.output(KV.of(c.element(), c.timestamp()));
           }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
index c1848c6..e233114 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
@@ -24,7 +24,7 @@ import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PBegin;
@@ -50,8 +50,8 @@ public class DisplayDataEvaluatorTest implements Serializable {
         new PTransform<PCollection<String>, POutput> () {
           @Override
           public PCollection<String> apply(PCollection<String> input) {
-            return input.apply(ParDo.of(new OldDoFn<String, String>() {
-              @Override
+            return input.apply(ParDo.of(new DoFn<String, String>() {
+              @ProcessElement
               public void processElement(ProcessContext c) throws Exception {
                 c.output(c.element());
               }
@@ -79,8 +79,8 @@ public class DisplayDataEvaluatorTest implements Serializable {
   @Test
   public void testPrimitiveTransform() {
     PTransform<? super PCollection<Integer>, ? super PCollection<Integer>> myTransform = ParDo.of(
-        new OldDoFn<Integer, Integer>() {
-      @Override
+        new DoFn<Integer, Integer>() {
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {}
 
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
index 517f968..e2f38b4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
@@ -41,7 +41,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
@@ -1053,8 +1053,8 @@ public class DisplayDataTest implements Serializable {
   private static class IdentityTransform<T> extends PTransform<PCollection<T>, PCollection<T>> {
     @Override
     public PCollection<T> apply(PCollection<T> input) {
-      return input.apply(ParDo.of(new OldDoFn<T, T>() {
-        @Override
+      return input.apply(ParDo.of(new DoFn<T, T>() {
+        @ProcessElement
         public void processElement(ProcessContext c) throws Exception {
           c.output(c.element());
         }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
index 97667a3..c6f82ec 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
@@ -29,9 +29,8 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -84,10 +83,11 @@ public class CoGroupByKeyTest implements Serializable {
       input = p.apply("Create" + name, Create.timestamped(list, timestamps)
           .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of())));
     }
-    return input
-            .apply("Identity" + name, ParDo.of(new OldDoFn<KV<Integer, String>,
-                                                 KV<Integer, String>>() {
-              @Override
+    return input.apply(
+        "Identity" + name,
+        ParDo.of(
+            new DoFn<KV<Integer, String>, KV<Integer, String>>() {
+              @ProcessElement
               public void processElement(ProcessContext c) {
                 c.output(c.element());
               }
@@ -313,11 +313,11 @@ public class CoGroupByKeyTest implements Serializable {
   }
 
   /**
-   * A OldDoFn used in testCoGroupByKeyWithWindowing(), to test processing the
-   * results of a CoGroupByKey.
+   * A DoFn used in testCoGroupByKeyWithWindowing(), to test processing the results of a
+   * CoGroupByKey.
    */
-  private static class ClickOfPurchaseFn extends
-      OldDoFn<KV<Integer, CoGbkResult>, KV<String, String>> implements RequiresWindowAccess {
+  private static class ClickOfPurchaseFn
+      extends DoFn<KV<Integer, CoGbkResult>, KV<String, String>> {
     private final TupleTag<String> clicksTag;
 
     private final TupleTag<String> purchasesTag;
@@ -329,9 +329,9 @@ public class CoGroupByKeyTest implements Serializable {
       this.purchasesTag = purchasesTag;
     }
 
-    @Override
-    public void processElement(ProcessContext c) {
-      BoundedWindow w = c.window();
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) {
+      BoundedWindow w = window;
       KV<Integer, CoGbkResult> e = c.element();
       CoGbkResult row = e.getValue();
       Iterable<String> clicks = row.getAll(clicksTag);
@@ -347,11 +347,11 @@ public class CoGroupByKeyTest implements Serializable {
 
 
   /**
-   * A OldDoFn used in testCoGroupByKeyHandleResults(), to test processing the
+   * A DoFn used in testCoGroupByKeyHandleResults(), to test processing the
    * results of a CoGroupByKey.
    */
   private static class CorrelatePurchaseCountForAddressesWithoutNamesFn extends
-      OldDoFn<KV<Integer, CoGbkResult>, KV<String, Integer>> {
+      DoFn<KV<Integer, CoGbkResult>, KV<String, Integer>> {
     private final TupleTag<String> purchasesTag;
 
     private final TupleTag<String> addressesTag;
@@ -367,7 +367,7 @@ public class CoGroupByKeyTest implements Serializable {
       this.namesTag = namesTag;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       KV<Integer, CoGbkResult> e = c.element();
       CoGbkResult row = e.getValue();
@@ -401,7 +401,7 @@ public class CoGroupByKeyTest implements Serializable {
   }
 
   /**
-   * Tests that the consuming OldDoFn
+   * Tests that the consuming DoFn
    * (CorrelatePurchaseCountForAddressesWithoutNamesFn) performs as expected.
    */
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 27d2539..c583860 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -36,8 +36,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -199,8 +199,8 @@ public class WindowTest implements Serializable {
         .apply(GroupByKey.<Integer, String>create())
         .apply(
             ParDo.of(
-                new OldDoFn<KV<Integer, Iterable<String>>, Void>() {
-                  @Override
+                new DoFn<KV<Integer, Iterable<String>>, Void>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) throws Exception {
                     assertThat(
                         c.timestamp(),
@@ -231,8 +231,8 @@ public class WindowTest implements Serializable {
         .apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10)))
             .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
         .apply(GroupByKey.<Integer, String>create())
-        .apply(ParDo.of(new OldDoFn<KV<Integer, Iterable<String>>, Void>() {
-          @Override
+        .apply(ParDo.of(new DoFn<KV<Integer, Iterable<String>>, Void>() {
+          @ProcessElement
           public void processElement(ProcessContext c) throws Exception {
             assertThat(c.timestamp(), equalTo(new Instant(10 * 60 * 1000 - 1)));
           }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index 622a277..159e700 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -26,9 +26,8 @@ import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
@@ -58,12 +57,14 @@ public class WindowingTest implements Serializable {
 
   private static class WindowedCount extends PTransform<PCollection<String>, PCollection<String>> {
 
-    private final class FormatCountsDoFn
-        extends OldDoFn<KV<String, Long>, String> implements RequiresWindowAccess {
-      @Override
-          public void processElement(ProcessContext c) {
-        c.output(c.element().getKey() + ":" + c.element().getValue()
-            + ":" + c.timestamp().getMillis() + ":" + c.window());
+    private final class FormatCountsDoFn extends DoFn<KV<String, Long>, String> {
+      @ProcessElement
+      public void processElement(ProcessContext c, BoundedWindow window) {
+        c.output(
+            c.element().getKey()
+                + ":" + c.element().getValue()
+                + ":" + c.timestamp().getMillis()
+                + ":" + window);
       }
     }
     private WindowFn<? super String, ?> windowFn;
@@ -234,9 +235,9 @@ public class WindowingTest implements Serializable {
     p.run();
   }
 
-  /** A OldDoFn that tokenizes lines of text into individual words. */
-  static class ExtractWordsWithTimestampsFn extends OldDoFn<String, String> {
-    @Override
+  /** A DoFn that tokenizes lines of text into individual words. */
+  static class ExtractWordsWithTimestampsFn extends DoFn<String, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       String[] words = c.element().split("[^a-zA-Z0-9']+");
       if (words.length == 2) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
index 547c778..13218b2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -75,8 +75,8 @@ public final class PCollectionTupleTest implements Serializable {
         .apply(Create.of(inputs));
 
     PCollectionTuple outputs = mainInput.apply(ParDo
-        .of(new OldDoFn<Integer, Integer>() {
-          @Override
+        .of(new DoFn<Integer, Integer>() {
+          @ProcessElement
           public void processElement(ProcessContext c) {
             c.sideOutput(sideOutputTag, c.element());
           }})

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
index c525cf1..287223f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 
 import org.junit.Rule;
@@ -44,9 +44,9 @@ public class TypedPValueTest {
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
-  private static class IdentityDoFn extends OldDoFn<Integer, Integer> {
+  private static class IdentityDoFn extends DoFn<Integer, Integer> {
     private static final long serialVersionUID = 0;
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       c.output(c.element());
     }
@@ -129,9 +129,9 @@ public class TypedPValueTest {
   static class EmptyClass {
   }
 
-  private static class EmptyClassDoFn extends OldDoFn<Integer, EmptyClass> {
+  private static class EmptyClassDoFn extends DoFn<Integer, EmptyClass> {
     private static final long serialVersionUID = 0;
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       c.output(new EmptyClass());
     }