You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/06 02:52:59 UTC

[41/51] [abbrv] incubator-beam git commit: Port easy I/O transforms to new DoFn

Port easy I/O transforms to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/269fbf38
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/269fbf38
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/269fbf38

Branch: refs/heads/python-sdk
Commit: 269fbf386454ea77845e54764a125edba7039b03
Parents: ef5e31f
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Aug 3 20:22:26 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 14:56:42 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |  3 +-
 .../java/org/apache/beam/sdk/io/PubsubIO.java   | 14 ++++----
 .../apache/beam/sdk/io/PubsubUnboundedSink.java | 17 +++++----
 .../beam/sdk/io/PubsubUnboundedSource.java      |  7 ++--
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 36 +++++++++-----------
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    | 12 +++----
 .../beam/sdk/io/gcp/datastore/V1Beta3.java      | 18 +++++-----
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 10 +++---
 .../sdk/io/gcp/bigtable/BigtableWriteIT.java    |  6 ++--
 .../sdk/io/gcp/datastore/V1Beta3TestUtil.java   |  9 +++--
 .../java/org/apache/beam/sdk/io/jms/JmsIO.java  | 10 +++---
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 19 +++++------
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 10 +++---
 13 files changed, 82 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index abcf415..fadd9c7 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -78,6 +78,7 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.OldDoFn;
@@ -2715,7 +2716,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     @Nullable
     private PTransform<?, ?> transform;
     @Nullable
-    private OldDoFn<?, ?> doFn;
+    private DoFn<?, ?> doFn;
 
     /**
      * Builds an instance of this class from the overridden transform.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 1902bca..2b27175 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -25,7 +25,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -709,11 +709,11 @@ public class PubsubIO {
        *
        * <p>Public so can be suppressed by runners.
        */
-      public class PubsubBoundedReader extends OldDoFn<Void, T> {
+      public class PubsubBoundedReader extends DoFn<Void, T> {
         private static final int DEFAULT_PULL_SIZE = 100;
         private static final int ACK_TIMEOUT_SEC = 60;
 
-        @Override
+        @ProcessElement
         public void processElement(ProcessContext c) throws IOException {
           try (PubsubClient pubsubClient =
                    FACTORY.newClient(timestampLabel, idLabel,
@@ -998,12 +998,12 @@ public class PubsubIO {
        *
        * <p>Public so can be suppressed by runners.
        */
-      public class PubsubBoundedWriter extends OldDoFn<T, Void> {
+      public class PubsubBoundedWriter extends DoFn<T, Void> {
         private static final int MAX_PUBLISH_BATCH_SIZE = 100;
         private transient List<OutgoingMessage> output;
         private transient PubsubClient pubsubClient;
 
-        @Override
+        @StartBundle
         public void startBundle(Context c) throws IOException {
           this.output = new ArrayList<>();
           // NOTE: idLabel is ignored.
@@ -1012,7 +1012,7 @@ public class PubsubIO {
                                 c.getPipelineOptions().as(PubsubOptions.class));
         }
 
-        @Override
+        @ProcessElement
         public void processElement(ProcessContext c) throws IOException {
           // NOTE: The record id is always null.
           OutgoingMessage message =
@@ -1025,7 +1025,7 @@ public class PubsubIO {
           }
         }
 
-        @Override
+        @FinishBundle
         public void finishBundle(Context c) throws IOException {
           if (!output.isEmpty()) {
             publish();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index 9e9536d..3014751 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -31,8 +31,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
@@ -65,7 +65,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
-
 import javax.annotation.Nullable;
 
 /**
@@ -78,7 +77,7 @@ import javax.annotation.Nullable;
  * <li>We try to send messages in batches while also limiting send latency.
  * <li>No stats are logged. Rather some counters are used to keep track of elements and batches.
  * <li>Though some background threads are used by the underlying netty system all actual Pubsub
- * calls are blocking. We rely on the underlying runner to allow multiple {@link OldDoFn} instances
+ * calls are blocking. We rely on the underlying runner to allow multiple {@link DoFn} instances
  * to execute concurrently and hide latency.
  * <li>A failed bundle will cause messages to be resent. Thus we rely on the Pubsub consumer
  * to dedup messages.
@@ -155,7 +154,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
   /**
    * Convert elements to messages and shard them.
    */
-  private static class ShardFn<T> extends OldDoFn<T, KV<Integer, OutgoingMessage>> {
+  private static class ShardFn<T> extends DoFn<T, KV<Integer, OutgoingMessage>> {
     private final Aggregator<Long, Long> elementCounter =
         createAggregator("elements", new Sum.SumLongFn());
     private final Coder<T> elementCoder;
@@ -168,7 +167,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
       this.recordIdMethod = recordIdMethod;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       elementCounter.addValue(1L);
       byte[] elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element());
@@ -207,7 +206,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
    * Publish messages to Pubsub in batches.
    */
   private static class WriterFn
-      extends OldDoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> {
+      extends DoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> {
     private final PubsubClientFactory pubsubFactory;
     private final TopicPath topic;
     private final String timestampLabel;
@@ -253,14 +252,14 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
       byteCounter.addValue((long) bytes);
     }
 
-    @Override
+    @StartBundle
     public void startBundle(Context c) throws Exception {
       checkState(pubsubClient == null, "startBundle invoked without prior finishBundle");
       pubsubClient = pubsubFactory.newClient(timestampLabel, idLabel,
                                              c.getPipelineOptions().as(PubsubOptions.class));
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       List<OutgoingMessage> pubsubMessages = new ArrayList<>(publishBatchSize);
       int bytes = 0;
@@ -285,7 +284,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
       }
     }
 
-    @Override
+    @FinishBundle
     public void finishBundle(Context c) throws Exception {
       pubsubClient.close();
       pubsubClient = null;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index d98bd6a..f99b471 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -31,7 +31,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
@@ -77,7 +77,6 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import javax.annotation.Nullable;
 
 /**
@@ -1107,7 +1106,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
   // StatsFn
   // ================================================================================
 
-  private static class StatsFn<T> extends OldDoFn<T, T> {
+  private static class StatsFn<T> extends DoFn<T, T> {
     private final Aggregator<Long, Long> elementCounter =
         createAggregator("elements", new Sum.SumLongFn());
 
@@ -1131,7 +1130,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
       this.idLabel = idLabel;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       elementCounter.addValue(1L);
       c.output(c.element());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 2ba7562..ed2c32e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -103,7 +104,6 @@ import com.google.common.io.CountingOutputStream;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-
 import org.apache.avro.generic.GenericRecord;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -135,7 +135,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
 import javax.annotation.Nullable;
 
 /**
@@ -334,7 +333,7 @@ public class BigQueryIO {
    * <p>Each {@link TableRow} contains values indexed by column name. Here is a
    * sample processing function that processes a "line" column from rows:
    * <pre>{@code
-   * static class ExtractWordsFn extends OldDoFn<TableRow, String> {
+   * static class ExtractWordsFn extends DoFn<TableRow, String> {
    *   public void processElement(ProcessContext c) {
    *     // Get the "line" field of the TableRow object, split it into words, and emit them.
    *     TableRow row = c.element();
@@ -706,8 +705,8 @@ public class BigQueryIO {
       input.getPipeline()
           .apply("Create(CleanupOperation)", Create.of(cleanupOperation))
           .apply("Cleanup", ParDo.of(
-              new OldDoFn<CleanupOperation, Void>() {
-                @Override
+              new DoFn<CleanupOperation, Void>() {
+                @ProcessElement
                 public void processElement(ProcessContext c)
                     throws Exception {
                   c.element().cleanup(c.getPipelineOptions());
@@ -717,8 +716,8 @@ public class BigQueryIO {
       return outputs.get(mainOutput);
     }
 
-    private static class IdentityFn<T> extends OldDoFn<T, T> {
-      @Override
+    private static class IdentityFn<T> extends DoFn<T, T> {
+      @ProcessElement
       public void processElement(ProcessContext c) {
         c.output(c.element());
       }
@@ -1271,7 +1270,7 @@ public class BigQueryIO {
    * <p>Here is a sample transform that produces TableRow values containing
    * "word" and "count" columns:
    * <pre>{@code
-   * static class FormatCountsFn extends OldDoFn<KV<String, Long>, TableRow> {
+   * static class FormatCountsFn extends DoFn<KV<String, Long>, TableRow> {
    *   public void processElement(ProcessContext c) {
    *     TableRow row = new TableRow()
    *         .set("word", c.element().getKey())
@@ -2307,11 +2306,11 @@ public class BigQueryIO {
   /////////////////////////////////////////////////////////////////////////////
 
   /**
-   * Implementation of OldDoFn to perform streaming BigQuery write.
+   * Implementation of DoFn to perform streaming BigQuery write.
    */
   @SystemDoFnInternal
   private static class StreamingWriteFn
-      extends OldDoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
+      extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
     /** TableSchema in JSON. Use String to make the class Serializable. */
     private final String jsonTableSchema;
 
@@ -2339,14 +2338,14 @@ public class BigQueryIO {
     }
 
     /** Prepares a target BigQuery table. */
-    @Override
+    @StartBundle
     public void startBundle(Context context) {
       tableRows = new HashMap<>();
       uniqueIdsForTableRows = new HashMap<>();
     }
 
     /** Accumulates the input into JsonTableRows and uniqueIdsForTableRows. */
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext context) {
       String tableSpec = context.element().getKey().getKey();
       List<TableRow> rows = getOrCreateMapListValue(tableRows, tableSpec);
@@ -2357,7 +2356,7 @@ public class BigQueryIO {
     }
 
     /** Writes the accumulated rows into BigQuery with streaming API. */
-    @Override
+    @FinishBundle
     public void finishBundle(Context context) throws Exception {
       BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
 
@@ -2544,8 +2543,7 @@ public class BigQueryIO {
    * id is created by concatenating this randomUUID with a sequential number.
    */
   private static class TagWithUniqueIdsAndTable
-      extends OldDoFn<TableRow, KV<ShardedKey<String>, TableRowInfo>>
-      implements OldDoFn.RequiresWindowAccess {
+      extends DoFn<TableRow, KV<ShardedKey<String>, TableRowInfo>> {
     /** TableSpec to write to. */
     private final String tableSpec;
 
@@ -2571,18 +2569,18 @@ public class BigQueryIO {
     }
 
 
-    @Override
+    @StartBundle
     public void startBundle(Context context) {
       randomUUID = UUID.randomUUID().toString();
     }
 
     /** Tag the input with a unique id. */
-    @Override
-    public void processElement(ProcessContext context) throws IOException {
+    @ProcessElement
+    public void processElement(ProcessContext context, BoundedWindow window) throws IOException {
       String uniqueId = randomUUID + sequenceNo++;
       ThreadLocalRandom randomGenerator = ThreadLocalRandom.current();
       String tableSpec = tableSpecFromWindow(
-          context.getPipelineOptions().as(BigQueryOptions.class), context.window());
+          context.getPipelineOptions().as(BigQueryOptions.class), window);
       // We output on keys 0-50 to ensure that there's enough batching for
       // BigQuery.
       context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, 50)),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 1f77e3e..bfdf4aa 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -31,7 +31,7 @@ import org.apache.beam.sdk.io.range.ByteKeyRange;
 import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -55,7 +55,6 @@ import com.google.common.util.concurrent.Futures;
 import com.google.protobuf.ByteString;
 
 import io.grpc.Status;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +64,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentLinkedQueue;
-
 import javax.annotation.Nullable;
 
 /**
@@ -512,7 +510,7 @@ public class BigtableIO {
       return new BigtableServiceImpl(options);
     }
 
-    private class BigtableWriterFn extends OldDoFn<KV<ByteString, Iterable<Mutation>>, Void> {
+    private class BigtableWriterFn extends DoFn<KV<ByteString, Iterable<Mutation>>, Void> {
 
       public BigtableWriterFn(String tableId, BigtableService bigtableService) {
         this.tableId = checkNotNull(tableId, "tableId");
@@ -520,13 +518,13 @@ public class BigtableIO {
         this.failures = new ConcurrentLinkedQueue<>();
       }
 
-      @Override
+      @StartBundle
       public void startBundle(Context c) throws Exception {
         bigtableWriter = bigtableService.openForWriting(tableId);
         recordsWritten = 0;
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         checkForFailures();
         Futures.addCallback(
@@ -534,7 +532,7 @@ public class BigtableIO {
         ++recordsWritten;
       }
 
-      @Override
+      @FinishBundle
       public void finishBundle(Context c) throws Exception {
         bigtableWriter.close();
         bigtableWriter = null;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
index 6f3663a..052feb3 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
@@ -37,9 +37,9 @@ import org.apache.beam.sdk.io.Sink.Writer;
 import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Values;
@@ -478,11 +478,11 @@ public class V1Beta3 {
     }
 
     /**
-     * A {@link OldDoFn} that splits a given query into multiple sub-queries, assigns them unique
+     * A {@link DoFn} that splits a given query into multiple sub-queries, assigns them unique
      * keys and outputs them as {@link KV}.
      */
     @VisibleForTesting
-    static class SplitQueryFn extends OldDoFn<Query, KV<Integer, Query>> {
+    static class SplitQueryFn extends DoFn<Query, KV<Integer, Query>> {
       private final V1Beta3Options options;
       // number of splits to make for a given query
       private final int numSplits;
@@ -505,13 +505,13 @@ public class V1Beta3 {
         this.datastoreFactory = datastoreFactory;
       }
 
-      @Override
+      @StartBundle
       public void startBundle(Context c) throws Exception {
         datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.projectId);
         querySplitter = datastoreFactory.getQuerySplitter();
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         int key = 1;
         Query query = c.element();
@@ -559,10 +559,10 @@ public class V1Beta3 {
     }
 
     /**
-     * A {@link OldDoFn} that reads entities from Datastore for each query.
+     * A {@link DoFn} that reads entities from Datastore for each query.
      */
     @VisibleForTesting
-    static class ReadFn extends OldDoFn<Query, Entity> {
+    static class ReadFn extends DoFn<Query, Entity> {
       private final V1Beta3Options options;
       private final V1Beta3DatastoreFactory datastoreFactory;
       // Datastore client
@@ -578,13 +578,13 @@ public class V1Beta3 {
         this.datastoreFactory = datastoreFactory;
       }
 
-      @Override
+      @StartBundle
       public void startBundle(Context c) throws Exception {
         datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId());
       }
 
       /** Read and output entities for the given query. */
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext context) throws Exception {
         Query query = context.element();
         String namespace = options.getNamespace();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 1ea1f94..6d6eb60 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -22,6 +22,7 @@ import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.toJsonString;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 
 import static com.google.common.base.Preconditions.checkArgument;
+
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -64,8 +65,8 @@ import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -131,7 +132,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
-
 import javax.annotation.Nullable;
 
 /**
@@ -235,7 +235,7 @@ public class BigQueryIOTest implements Serializable {
     private Object[] pollJobReturns;
     private String executingProject;
     // Both counts will be reset back to zeros after serialization.
-    // This is a work around for OldDoFn's verifyUnmodified check.
+    // This is a work around for DoFn's verifyUnmodified check.
     private transient int startJobCallsCount;
     private transient int pollJobStatusCallsCount;
 
@@ -571,8 +571,8 @@ public class BigQueryIOTest implements Serializable {
         .apply(BigQueryIO.Read.from("non-executing-project:somedataset.sometable")
             .withTestServices(fakeBqServices)
             .withoutValidation())
-        .apply(ParDo.of(new OldDoFn<TableRow, String>() {
-          @Override
+        .apply(ParDo.of(new DoFn<TableRow, String>() {
+          @ProcessElement
           public void processElement(ProcessContext c) throws Exception {
             c.output((String) c.element().get("name"));
           }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
index 83489a5..ee3a6f9 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
@@ -23,7 +23,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
 
@@ -108,8 +108,8 @@ public class BigtableWriteIT implements Serializable {
 
     Pipeline p = Pipeline.create(options);
     p.apply(CountingInput.upTo(numRows))
-        .apply(ParDo.of(new OldDoFn<Long, KV<ByteString, Iterable<Mutation>>>() {
-          @Override
+        .apply(ParDo.of(new DoFn<Long, KV<ByteString, Iterable<Mutation>>>() {
+          @ProcessElement
           public void processElement(ProcessContext c) {
             int index = c.element().intValue();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
index daed1cb..7eaf23e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
@@ -27,7 +27,7 @@ import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
 
 import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
 import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
 
@@ -60,7 +60,6 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
-
 import javax.annotation.Nullable;
 
 class V1Beta3TestUtil {
@@ -109,9 +108,9 @@ class V1Beta3TestUtil {
   }
 
   /**
-   * A OldDoFn that creates entity for a long number.
+   * A DoFn that creates entity for a long number.
    */
-  static class CreateEntityFn extends OldDoFn<Long, Entity> {
+  static class CreateEntityFn extends DoFn<Long, Entity> {
     private final String kind;
     @Nullable
     private final String namespace;
@@ -124,7 +123,7 @@ class V1Beta3TestUtil {
       ancestorKey = makeAncestorKey(namespace, kind, ancestor);
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       c.output(makeEntity(c.element(), ancestorKey, kind, namespace));
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index eeb02e6..557fe13 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -453,7 +453,7 @@ public class JmsIO {
       checkArgument((queue != null || topic != null), "Either queue or topic is required");
     }
 
-    private static class JmsWriter extends OldDoFn<String, Void> {
+    private static class JmsWriter extends DoFn<String, Void> {
 
       private ConnectionFactory connectionFactory;
       private String queue;
@@ -469,7 +469,7 @@ public class JmsIO {
         this.topic = topic;
       }
 
-      @Override
+      @StartBundle
       public void startBundle(Context c) throws Exception {
         if (producer == null) {
           this.connection = connectionFactory.createConnection();
@@ -486,7 +486,7 @@ public class JmsIO {
         }
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext ctx) throws Exception {
         String value = ctx.element();
 
@@ -499,7 +499,7 @@ public class JmsIO {
         }
       }
 
-      @Override
+      @FinishBundle
       public void finishBundle(Context c) throws Exception {
         producer.close();
         producer = null;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 2271216..2383105 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -33,7 +33,7 @@ import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -94,7 +94,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import javax.annotation.Nullable;
 
 /**
@@ -550,8 +549,8 @@ public class KafkaIO {
       return typedRead
           .apply(begin)
           .apply("Remove Kafka Metadata",
-              ParDo.of(new OldDoFn<KafkaRecord<K, V>, KV<K, V>>() {
-                @Override
+              ParDo.of(new DoFn<KafkaRecord<K, V>, KV<K, V>>() {
+                @ProcessElement
                 public void processElement(ProcessContext ctx) {
                   ctx.output(ctx.element().getKV());
                 }
@@ -1315,8 +1314,8 @@ public class KafkaIO {
     public PDone apply(PCollection<V> input) {
       return input
         .apply("Kafka values with default key",
-          ParDo.of(new OldDoFn<V, KV<Void, V>>() {
-            @Override
+          ParDo.of(new DoFn<V, KV<Void, V>>() {
+            @ProcessElement
             public void processElement(ProcessContext ctx) throws Exception {
               ctx.output(KV.<Void, V>of(null, ctx.element()));
             }
@@ -1326,9 +1325,9 @@ public class KafkaIO {
     }
   }
 
-  private static class KafkaWriter<K, V> extends OldDoFn<KV<K, V>, Void> {
+  private static class KafkaWriter<K, V> extends DoFn<KV<K, V>, Void> {
 
-    @Override
+    @StartBundle
     public void startBundle(Context c) throws Exception {
       // Producer initialization is fairly costly. Move this to future initialization api to avoid
       // creating a producer for each bundle.
@@ -1341,7 +1340,7 @@ public class KafkaIO {
       }
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext ctx) throws Exception {
       checkForFailures();
 
@@ -1351,7 +1350,7 @@ public class KafkaIO {
           new SendCallback());
     }
 
-    @Override
+    @FinishBundle
     public void finishBundle(Context c) throws Exception {
       producer.flush();
       producer.close();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index d7b1921..9a89c36 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -33,10 +33,10 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.RemoveDuplicates;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -280,8 +280,8 @@ public class KafkaIOTest {
     p.run();
   }
 
-  private static class ElementValueDiff extends OldDoFn<Long, Long> {
-    @Override
+  private static class ElementValueDiff extends DoFn<Long, Long> {
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       c.output(c.element() - c.timestamp().getMillis());
     }
@@ -308,8 +308,8 @@ public class KafkaIOTest {
     p.run();
   }
 
-  private static class RemoveKafkaMetadata<K, V> extends OldDoFn<KafkaRecord<K, V>, KV<K, V>> {
-    @Override
+  private static class RemoveKafkaMetadata<K, V> extends DoFn<KafkaRecord<K, V>, KV<K, V>> {
+    @ProcessElement
     public void processElement(ProcessContext ctx) throws Exception {
       ctx.output(ctx.element().getKV());
     }