You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/08 20:40:36 UTC

[01/13] incubator-beam git commit: Port various Spark runner tests to new DoFn

Repository: incubator-beam
Updated Branches:
  refs/heads/master bb00810ad -> 574c3777d


Port various Spark runner tests to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f5df3583
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f5df3583
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f5df3583

Branch: refs/heads/master
Commit: f5df358320cfde6a1c4d012d4169af691f6a18e9
Parents: d6395e9
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 12:31:07 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/spark/TfIdfTest.java    | 22 ++++++++++----------
 .../spark/translation/CombinePerKeyTest.java    |  6 +++---
 .../translation/MultiOutputWordCountTest.java   | 10 ++++-----
 .../spark/translation/SerializationTest.java    | 10 ++++-----
 .../streaming/KafkaStreamingTest.java           |  6 +++---
 5 files changed, 27 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5df3583/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
index 074e6aa..17bf6dd 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
@@ -24,8 +24,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Keys;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.RemoveDuplicates;
@@ -101,8 +101,8 @@ public class TfIdfTest {
       // of the words in the document associated with that that URI.
       PCollection<KV<URI, String>> uriToWords = uriToContent
           .apply("SplitWords", ParDo.of(
-              new OldDoFn<KV<URI, String>, KV<URI, String>>() {
-                @Override
+              new DoFn<KV<URI, String>, KV<URI, String>>() {
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   URI uri = c.element().getKey();
                   String line = c.element().getValue();
@@ -144,8 +144,8 @@ public class TfIdfTest {
       // by the URI key.
       PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
           .apply("ShiftKeys", ParDo.of(
-              new OldDoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
-                @Override
+              new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   URI uri = c.element().getKey().getKey();
                   String word = c.element().getKey().getValue();
@@ -183,8 +183,8 @@ public class TfIdfTest {
       // divided by the total number of words in the document.
       PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
           .apply("ComputeTermFrequencies", ParDo.of(
-              new OldDoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
-                @Override
+              new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   URI uri = c.element().getKey();
                   Long wordTotal = c.element().getValue().getOnly(wordTotalsTag);
@@ -208,8 +208,8 @@ public class TfIdfTest {
       PCollection<KV<String, Double>> wordToDf = wordToDocCount
           .apply("ComputeDocFrequencies", ParDo
               .withSideInputs(totalDocuments)
-              .of(new OldDoFn<KV<String, Long>, KV<String, Double>>() {
-                @Override
+              .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   String word = c.element().getKey();
                   Long documentCount = c.element().getValue();
@@ -237,8 +237,8 @@ public class TfIdfTest {
       // divided by the log of the document frequency.
       return wordToUriAndTfAndDf
           .apply("ComputeTfIdf", ParDo.of(
-              new OldDoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
-                @Override
+              new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   String word = c.element().getKey();
                   Double df = c.element().getValue().getOnly(dfTag);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5df3583/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
index dee9213..cdf2cfb 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
@@ -70,8 +70,8 @@ public class CombinePerKeyTest {
     private static class SumPerKey<T> extends PTransform<PCollection<T>, PCollection<KV<T, Long>>> {
       @Override
       public PCollection<KV<T, Long>> apply(PCollection<T> pcol) {
-          PCollection<KV<T, Long>> withLongs = pcol.apply(ParDo.of(new OldDoFn<T, KV<T, Long>>() {
-              @Override
+          PCollection<KV<T, Long>> withLongs = pcol.apply(ParDo.of(new DoFn<T, KV<T, Long>>() {
+              @ProcessElement
               public void processElement(ProcessContext processContext) throws Exception {
                   processContext.output(KV.of(processContext.element(), 1L));
               }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5df3583/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
index 066521b..291f7b2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
@@ -30,9 +30,9 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.ApproximateUnique;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
@@ -105,7 +105,7 @@ public class MultiOutputWordCountTest {
   /**
    * A OldDoFn that tokenizes lines of text into individual words.
    */
-  static class ExtractWordsFn extends OldDoFn<String, String> {
+  static class ExtractWordsFn extends DoFn<String, String> {
 
     private final Aggregator<Integer, Integer> totalWords = createAggregator("totalWords",
         new Sum.SumIntegerFn());
@@ -117,7 +117,7 @@ public class MultiOutputWordCountTest {
       this.regex = regex;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       String[] words = c.element().split(c.sideInput(regex));
       for (String word : words) {
@@ -170,8 +170,8 @@ public class MultiOutputWordCountTest {
     }
   }
 
-  private static class FormatCountsFn extends OldDoFn<KV<String, Long>, String> {
-    @Override
+  private static class FormatCountsFn extends DoFn<KV<String, Long>, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(c.element().getKey() + ": " + c.element().getValue());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5df3583/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
index fb97b8b..019b107 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
@@ -147,12 +147,12 @@ public class SerializationTest {
   /**
    * A OldDoFn that tokenizes lines of text into individual words.
    */
-  static class ExtractWordsFn extends OldDoFn<StringHolder, StringHolder> {
+  static class ExtractWordsFn extends DoFn<StringHolder, StringHolder> {
     private static final Pattern WORD_BOUNDARY = Pattern.compile("[^a-zA-Z']+");
     private final Aggregator<Long, Long> emptyLines =
         createAggregator("emptyLines", new Sum.SumLongFn());
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       // Split the line into words.
       String[] words = WORD_BOUNDARY.split(c.element().toString());
@@ -175,8 +175,8 @@ public class SerializationTest {
   /**
    * A OldDoFn that converts a Word and Count into a printable string.
    */
-  private static class FormatCountsFn extends OldDoFn<KV<StringHolder, Long>, StringHolder> {
-    @Override
+  private static class FormatCountsFn extends DoFn<KV<StringHolder, Long>, StringHolder> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(new StringHolder(c.element().getKey() + ": " + c.element().getValue()));
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5df3583/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index fa98ca3..17044aa 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -121,8 +121,8 @@ public class KafkaStreamingTest {
     EMBEDDED_ZOOKEEPER.shutdown();
   }
 
-  private static class FormatKVFn extends OldDoFn<KV<String, String>, String> {
-    @Override
+  private static class FormatKVFn extends DoFn<KV<String, String>, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(c.element().getKey() + "," + c.element().getValue());
     }


[09/13] incubator-beam git commit: Port Java 8 tests to new DoFn

Posted by ke...@apache.org.
Port Java 8 tests to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/879f18fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/879f18fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/879f18fd

Branch: refs/heads/master
Commit: 879f18fd1694b0540e3e695416566f24220ecb1e
Parents: 331f523
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 12:12:12 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/WithTimestampsJava8Test.java    | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/879f18fd/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java
index 1141e88..03aa647 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java
@@ -47,9 +47,9 @@ public class WithTimestampsJava8Test implements Serializable {
          .apply(WithTimestamps.of((String input) -> new Instant(Long.valueOf(yearTwoThousand))));
 
     PCollection<KV<String, Instant>> timestampedVals =
-        timestamped.apply(ParDo.of(new OldDoFn<String, KV<String, Instant>>() {
-          @Override
-          public void processElement(OldDoFn<String, KV<String, Instant>>.ProcessContext c)
+        timestamped.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() {
+          @ProcessElement
+          public void processElement(ProcessContext c)
               throws Exception {
             c.output(KV.of(c.element(), c.timestamp()));
           }


[06/13] incubator-beam git commit: Port Window transform to new DoFn

Posted by ke...@apache.org.
Port Window transform to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2c6aaf73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2c6aaf73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2c6aaf73

Branch: refs/heads/master
Commit: 2c6aaf730353c4db12aea60fd89851bddec0415c
Parents: ecf21a5
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 11:47:57 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/transforms/windowing/Window.java | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c6aaf73/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 5b6f4c8..c1b0237 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -21,8 +21,8 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -645,8 +645,9 @@ public class Window {
           // We first apply a (trivial) transform to the input PCollection to produce a new
           // PCollection. This ensures that we don't modify the windowing strategy of the input
           // which may be used elsewhere.
-          .apply("Identity", ParDo.of(new OldDoFn<T, T>() {
-            @Override public void processElement(ProcessContext c) {
+          .apply("Identity", ParDo.of(new DoFn<T, T>() {
+            @ProcessElement
+            public void processElement(ProcessContext c) {
               c.output(c.element());
             }
           }))


[10/13] incubator-beam git commit: Port Flink fork of examples to new DoFn

Posted by ke...@apache.org.
Port Flink fork of examples to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/87313f1c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/87313f1c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/87313f1c

Branch: refs/heads/master
Commit: 87313f1c3d8cf874e04aaf528161478afa030f38
Parents: ae1f6d1
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 12:24:24 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/flink/examples/TFIDF.java      | 28 +++++++--------
 .../beam/runners/flink/examples/WordCount.java  |  5 +--
 .../flink/examples/streaming/AutoComplete.java  | 37 ++++++++++----------
 .../flink/examples/streaming/JoinExamples.java  | 14 ++++----
 .../examples/streaming/KafkaIOExamples.java     |  7 ++--
 .../KafkaWindowedWordCountExample.java          | 10 +++---
 .../examples/streaming/WindowedWordCount.java   | 10 +++---
 7 files changed, 57 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index 716c8ad..4deca12 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.Keys;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -230,10 +230,10 @@ public class TFIDF {
       // Create a collection of pairs mapping a URI to each
       // of the words in the document associated with that that URI.
       PCollection<KV<URI, String>> uriToWords = uriToContent
-          .apply("SplitWords", ParDo.of(new OldDoFn<KV<URI, String>, KV<URI, String>>() {
+          .apply("SplitWords", ParDo.of(new DoFn<KV<URI, String>, KV<URI, String>>() {
             private static final long serialVersionUID = 0;
 
-            @Override
+            @ProcessElement
             public void processElement(ProcessContext c) {
               URI uri = c.element().getKey();
               String line = c.element().getValue();
@@ -275,10 +275,10 @@ public class TFIDF {
       // by the URI key.
       PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
           .apply("ShiftKeys", ParDo.of(
-              new OldDoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
+              new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
                 private static final long serialVersionUID = 0;
 
-                @Override
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   URI uri = c.element().getKey().getKey();
                   String word = c.element().getKey().getValue();
@@ -316,10 +316,10 @@ public class TFIDF {
       // divided by the total number of words in the document.
       PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
           .apply("ComputeTermFrequencies", ParDo.of(
-              new OldDoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+              new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
                 private static final long serialVersionUID = 0;
 
-                @Override
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   URI uri = c.element().getKey();
                   Long wordTotal = c.element().getValue().getOnly(wordTotalsTag);
@@ -339,14 +339,14 @@ public class TFIDF {
       // documents in which the word appears divided by the total
       // number of documents in the corpus. Note how the total number of
       // documents is passed as a side input; the same value is
-      // presented to each invocation of the OldDoFn.
+      // presented to each invocation of the DoFn.
       PCollection<KV<String, Double>> wordToDf = wordToDocCount
           .apply("ComputeDocFrequencies", ParDo
               .withSideInputs(totalDocuments)
-              .of(new OldDoFn<KV<String, Long>, KV<String, Double>>() {
+              .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
                 private static final long serialVersionUID = 0;
 
-                @Override
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   String word = c.element().getKey();
                   Long documentCount = c.element().getValue();
@@ -375,10 +375,10 @@ public class TFIDF {
 
       return wordToUriAndTfAndDf
           .apply("ComputeTfIdf", ParDo.of(
-              new OldDoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+              new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
                 private static final long serialVersionUID = 0;
 
-                @Override
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   String word = c.element().getKey();
                   Double df = c.element().getValue().getOnly(dfTag);
@@ -416,10 +416,10 @@ public class TFIDF {
     @Override
     public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
       return wordToUriAndTfIdf
-          .apply("Format", ParDo.of(new OldDoFn<KV<String, KV<URI, Double>>, String>() {
+          .apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() {
             private static final long serialVersionUID = 0;
 
-            @Override
+            @ProcessElement
             public void processElement(ProcessContext c) {
               c.output(String.format("%s,\t%s,\t%f",
                   c.element().getKey(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
index 080cdc9..fdffd39 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -38,11 +39,11 @@ import org.apache.beam.sdk.values.PCollection;
 
 public class WordCount {
 
-  public static class ExtractWordsFn extends OldDoFn<String, String> {
+  public static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
         createAggregator("emptyLines", new Sum.SumLongFn());
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       if (c.element().trim().isEmpty()) {
         emptyLines.addValue(1L);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index 068404a..aff1a35 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.Top;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -92,10 +93,10 @@ public class AutoComplete {
 
         // Map the KV outputs of Count into our own CompletionCandiate class.
         .apply("CreateCompletionCandidates", ParDo.of(
-            new OldDoFn<KV<String, Long>, CompletionCandidate>() {
+            new DoFn<KV<String, Long>, CompletionCandidate>() {
               private static final long serialVersionUID = 0;
 
-              @Override
+              @ProcessElement
               public void processElement(ProcessContext c) {
                 CompletionCandidate cand = new CompletionCandidate(c.element().getKey(), c.element().getValue());
                 c.output(cand);
@@ -182,10 +183,10 @@ public class AutoComplete {
     }
 
     private static class FlattenTops
-        extends OldDoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
+        extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
       private static final long serialVersionUID = 0;
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) {
         for (CompletionCandidate cc : c.element().getValue()) {
           c.output(cc);
@@ -236,10 +237,10 @@ public class AutoComplete {
   }
 
   /**
-   * A OldDoFn that keys each candidate by all its prefixes.
+   * A DoFn that keys each candidate by all its prefixes.
    */
   private static class AllPrefixes
-      extends OldDoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
+      extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
     private static final long serialVersionUID = 0;
 
     private final int minPrefix;
@@ -251,7 +252,7 @@ public class AutoComplete {
       this.minPrefix = minPrefix;
       this.maxPrefix = maxPrefix;
     }
-    @Override
+    @ProcessElement
       public void processElement(ProcessContext c) {
       String word = c.element().value;
       for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
@@ -314,11 +315,11 @@ public class AutoComplete {
     }
   }
 
-  static class ExtractWordsFn extends OldDoFn<String, String> {
+  static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
             createAggregator("emptyLines", new Sum.SumLongFn());
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       if (c.element().trim().isEmpty()) {
         emptyLines.addValue(1L);
@@ -337,21 +338,21 @@ public class AutoComplete {
   }
 
   /**
-   * Takes as input a the top candidates per prefix, and emits an entity
-   * suitable for writing to Datastore.
+   * Takes as input a the top candidates per prefix, and emits an entity suitable for writing to
+   * Datastore.
    */
-  static class FormatForPerTaskLocalFile extends OldDoFn<KV<String, List<CompletionCandidate>>, String>
-          implements OldDoFn.RequiresWindowAccess{
+  static class FormatForPerTaskLocalFile
+      extends DoFn<KV<String, List<CompletionCandidate>>, String> {
 
     private static final long serialVersionUID = 0;
 
-    @Override
-    public void processElement(ProcessContext c) {
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) {
       StringBuilder str = new StringBuilder();
       KV<String, List<CompletionCandidate>> elem = c.element();
 
-      str.append(elem.getKey() +" @ "+ c.window() +" -> ");
-      for(CompletionCandidate cand: elem.getValue()) {
+      str.append(elem.getKey() +" @ "+ window +" -> ");
+      for (CompletionCandidate cand: elem.getValue()) {
         str.append(cand.toString() + " ");
       }
       System.out.println(str.toString());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
index 7d7c0c7..458a263 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
@@ -23,7 +23,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.join.CoGbkResult;
 import org.apache.beam.sdk.transforms.join.CoGroupByKey;
@@ -76,10 +76,10 @@ public class JoinExamples {
     // country code 'key' -> string of <event info>, <country name>
     PCollection<KV<String, String>> finalResultCollection =
         kvpCollection.apply("Process", ParDo.of(
-            new OldDoFn<KV<String, CoGbkResult>, KV<String, String>>() {
+            new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
               private static final long serialVersionUID = 0;
 
-              @Override
+              @ProcessElement
               public void processElement(ProcessContext c) {
                 KV<String, CoGbkResult> e = c.element();
                 String key = e.getKey();
@@ -98,10 +98,10 @@ public class JoinExamples {
             }));
 
     return finalResultCollection
-        .apply("Format", ParDo.of(new OldDoFn<KV<String, String>, String>() {
+        .apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() {
           private static final long serialVersionUID = 0;
 
-          @Override
+          @ProcessElement
           public void processElement(ProcessContext c) {
             String result = c.element().getKey() + " -> " + c.element().getValue();
             System.out.println(result);
@@ -110,10 +110,10 @@ public class JoinExamples {
         }));
   }
 
-  static class ExtractEventDataFn extends OldDoFn<String, KV<String, String>> {
+  static class ExtractEventDataFn extends DoFn<String, KV<String, String>> {
     private static final long serialVersionUID = 0;
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       String line = c.element().toLowerCase();
       String key = line.split("\\s")[0];

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
index 395b409..68a9edc 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
@@ -30,9 +30,10 @@ import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
+
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
@@ -326,9 +327,9 @@ public class KafkaIOExamples {
    * Print contents to stdout
    * @param <T> type of the input
    */
-  private static class PrintFn<T> extends OldDoFn<T, T> {
+  private static class PrintFn<T> extends DoFn<T, T> {
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       System.out.println(c.element().toString());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
index 8c31783..39ce225 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
@@ -49,11 +49,11 @@ public class KafkaWindowedWordCountExample {
   static final String GROUP_ID = "myGroup";  // Default groupId
   static final String ZOOKEEPER = "localhost:2181";  // Default zookeeper to connect to for Kafka
 
-  public static class ExtractWordsFn extends OldDoFn<String, String> {
+  public static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
         createAggregator("emptyLines", new Sum.SumLongFn());
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       if (c.element().trim().isEmpty()) {
         emptyLines.addValue(1L);
@@ -71,8 +71,8 @@ public class KafkaWindowedWordCountExample {
     }
   }
 
-  public static class FormatAsStringFn extends OldDoFn<KV<String, Long>, String> {
-    @Override
+  public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
       System.out.println(row);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
index d149e4e..fe8e627 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
@@ -59,19 +59,19 @@ public class WindowedWordCount {
   static final long WINDOW_SIZE = 10;  // Default window duration in seconds
   static final long SLIDE_SIZE = 5;  // Default window slide in seconds
 
-  static class FormatAsStringFn extends OldDoFn<KV<String, Long>, String> {
-    @Override
+  static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
       c.output(row);
     }
   }
 
-  static class ExtractWordsFn extends OldDoFn<String, String> {
+  static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
         createAggregator("emptyLines", new Sum.SumLongFn());
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       if (c.element().trim().isEmpty()) {
         emptyLines.addValue(1L);


[07/13] incubator-beam git commit: Port Write to new DoFn

Posted by ke...@apache.org.
Port Write to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/86291de3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/86291de3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/86291de3

Branch: refs/heads/master
Commit: 86291de39772765f4d6d404ac8a8430d8ad8a15f
Parents: 2c6aaf7
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 11:49:37 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/beam/sdk/io/Write.java | 26 ++++++++++----------
 .../java/org/apache/beam/sdk/io/WriteTest.java  | 22 ++++++++++-------
 2 files changed, 26 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/86291de3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index 3e997b0..a846b7c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -27,8 +27,8 @@ import org.apache.beam.sdk.io.Sink.WriteOperation;
 import org.apache.beam.sdk.io.Sink.Writer;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -156,7 +156,7 @@ public class Write {
      * Writes all the elements in a bundle using a {@link Writer} produced by the
      * {@link WriteOperation} associated with the {@link Sink}.
      */
-    private class WriteBundles<WriteT> extends OldDoFn<T, WriteT> {
+    private class WriteBundles<WriteT> extends DoFn<T, WriteT> {
       // Writer that will write the records in this bundle. Lazily
       // initialized in processElement.
       private Writer<T, WriteT> writer = null;
@@ -166,7 +166,7 @@ public class Write {
         this.writeOperationView = writeOperationView;
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         // Lazily initialize the Writer
         if (writer == null) {
@@ -182,7 +182,7 @@ public class Write {
           // Discard write result and close the write.
           try {
             writer.close();
-            // The writer does not need to be reset, as this OldDoFn cannot be reused.
+            // The writer does not need to be reset, as this DoFn cannot be reused.
           } catch (Exception closeException) {
             if (closeException instanceof InterruptedException) {
               // Do not silently ignore interrupted state.
@@ -195,7 +195,7 @@ public class Write {
         }
       }
 
-      @Override
+      @FinishBundle
       public void finishBundle(Context c) throws Exception {
         if (writer != null) {
           WriteT result = writer.close();
@@ -217,14 +217,14 @@ public class Write {
      *
      * @see WriteBundles
      */
-    private class WriteShardedBundles<WriteT> extends OldDoFn<KV<Integer, Iterable<T>>, WriteT> {
+    private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> {
       private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
 
       WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
         this.writeOperationView = writeOperationView;
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         // In a sharded write, single input element represents one shard. We can open and close
         // the writer in each call to processElement.
@@ -296,8 +296,8 @@ public class Write {
      * <p>This singleton collection containing the WriteOperation is then used as a side input to a
      * ParDo over the PCollection of elements to write. In this bundle-writing phase,
      * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
-     * {@link Writer#open} and {@link Writer#close} are called in {@link OldDoFn#startBundle} and
-     * {@link OldDoFn#finishBundle}, respectively, and {@link Writer#write} method is called for
+     * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and
+     * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for
      * every element in the bundle. The output of this ParDo is a PCollection of
      * <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for
      * each bundle.
@@ -334,8 +334,8 @@ public class Write {
       // Initialize the resource in a do-once ParDo on the WriteOperation.
       operationCollection = operationCollection
           .apply("Initialize", ParDo.of(
-              new OldDoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
-            @Override
+              new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
+            @ProcessElement
             public void processElement(ProcessContext c) throws Exception {
               WriteOperation<T, WriteT> writeOperation = c.element();
               LOG.info("Initializing write operation {}", writeOperation);
@@ -388,8 +388,8 @@ public class Write {
       // ParDo. There is a dependency between this ParDo and the parallel write (the writer results
       // collection as a side input), so it will happen after the parallel write.
       operationCollection
-          .apply("Finalize", ParDo.of(new OldDoFn<WriteOperation<T, WriteT>, Integer>() {
-            @Override
+          .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
+            @ProcessElement
             public void processElement(ProcessContext c) throws Exception {
               WriteOperation<T, WriteT> writeOperation = c.element();
               LOG.info("Finalizing write operation {}.", writeOperation);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/86291de3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 4b6e749..705b77c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
+import static org.apache.beam.sdk.values.KV.of;
 
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -29,6 +30,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+import static java.util.concurrent.ThreadLocalRandom.current;
+
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
@@ -41,9 +44,9 @@ import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOption
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -73,7 +76,6 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -102,16 +104,18 @@ public class WriteTest {
       this.window = window;
     }
 
-    private static class AddArbitraryKey<T> extends OldDoFn<T, KV<Integer, T>> {
-      @Override
-      public void processElement(ProcessContext c) throws Exception {
-        c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element()));
+    private static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> {
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        c.output(of(current().nextInt(), c.element()));
       }
     }
 
-    private static class RemoveArbitraryKey<T> extends OldDoFn<KV<Integer, Iterable<T>>, T> {
-      @Override
-      public void processElement(ProcessContext c) throws Exception {
+    private static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> {
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {
         for (T s : c.element().getValue()) {
           c.output(s);
         }


[04/13] incubator-beam git commit: Port Filter to the new DoFn

Posted by ke...@apache.org.
Port Filter to the new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d798413b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d798413b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d798413b

Branch: refs/heads/master
Commit: d798413be41fa5941d12049d899aa6ad970b8515
Parents: 7629f97
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 11:46:53 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/beam/sdk/transforms/Filter.java     | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d798413b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
index 37cbec1..2d9bdee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
@@ -202,8 +202,8 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
 
   @Override
   public PCollection<T> apply(PCollection<T> input) {
-    return input.apply(ParDo.of(new OldDoFn<T, T>() {
-      @Override
+    return input.apply(ParDo.of(new DoFn<T, T>() {
+      @ProcessElement
       public void processElement(ProcessContext c) {
         if (predicate.apply(c.element())) {
           c.output(c.element());


[11/13] incubator-beam git commit: Port ViewTest to new DoFn

Posted by ke...@apache.org.
Port ViewTest to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b1db02d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b1db02d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b1db02d2

Branch: refs/heads/master
Commit: b1db02d23f9454ff1a169d0aa81552e8dbe59fe3
Parents: 32f84bb
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 12:07:28 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/ViewTest.java    | 192 ++++++++++---------
 1 file changed, 97 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1db02d2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index ee240bf..170e6ce 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import static org.apache.beam.sdk.values.KV.of;
+
 import static com.google.common.base.Preconditions.checkArgument;
 
 import static org.hamcrest.Matchers.isA;
@@ -100,8 +102,8 @@ public class ViewTest implements Serializable {
     PCollection<Integer> output =
         pipeline.apply("Create123", Create.of(1, 2, 3))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(c.sideInput(view));
                   }
@@ -131,8 +133,8 @@ public class ViewTest implements Serializable {
             TimestampedValue.of(3, new Instant(12))))
             .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(c.sideInput(view));
                   }
@@ -153,8 +155,8 @@ public class ViewTest implements Serializable {
             .apply(View.<Integer>asSingleton());
 
     pipeline.apply("Create123", Create.of(1, 2, 3))
-        .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-          @Override
+        .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+          @ProcessElement
           public void processElement(ProcessContext c) {
             c.output(c.sideInput(view));
           }
@@ -178,8 +180,8 @@ public class ViewTest implements Serializable {
     final PCollectionView<Integer> view = oneTwoThree.apply(View.<Integer>asSingleton());
 
     oneTwoThree.apply(
-        "OutputSideInputs", ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-          @Override
+        "OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+          @ProcessElement
           public void processElement(ProcessContext c) {
             c.output(c.sideInput(view));
           }
@@ -205,8 +207,8 @@ public class ViewTest implements Serializable {
     PCollection<Integer> output =
         pipeline.apply("CreateMainInput", Create.of(29, 31))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     checkArgument(c.sideInput(view).size() == 4);
                     checkArgument(c.sideInput(view).get(0) == c.sideInput(view).get(0));
@@ -246,8 +248,8 @@ public class ViewTest implements Serializable {
             .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     checkArgument(c.sideInput(view).size() == 4);
                     checkArgument(c.sideInput(view).get(0) == c.sideInput(view).get(0));
@@ -274,8 +276,8 @@ public class ViewTest implements Serializable {
     PCollection<Integer> results =
         pipeline.apply("Create1", Create.of(1))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     assertTrue(c.sideInput(view).isEmpty());
                     assertFalse(c.sideInput(view).iterator().hasNext());
@@ -283,7 +285,7 @@ public class ViewTest implements Serializable {
                   }
                 }));
 
-    // Pass at least one value through to guarantee that OldDoFn executes.
+    // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(results).containsInAnyOrder(1);
 
     pipeline.run();
@@ -300,8 +302,8 @@ public class ViewTest implements Serializable {
     PCollection<Integer> output =
         pipeline.apply("CreateMainInput", Create.of(29))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     try {
                       c.sideInput(view).clear();
@@ -329,7 +331,7 @@ public class ViewTest implements Serializable {
                   }
                 }));
 
-    // Pass at least one value through to guarantee that OldDoFn executes.
+    // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(output).containsInAnyOrder(11);
 
     pipeline.run();
@@ -347,8 +349,8 @@ public class ViewTest implements Serializable {
     PCollection<Integer> output =
         pipeline.apply("CreateMainInput", Create.of(29, 31))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (Integer i : c.sideInput(view)) {
                       c.output(i);
@@ -387,8 +389,8 @@ public class ViewTest implements Serializable {
                     TimestampedValue.of(35, new Instant(11))))
             .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (Integer i : c.sideInput(view)) {
                       c.output(i);
@@ -413,15 +415,15 @@ public class ViewTest implements Serializable {
     PCollection<Integer> results =
         pipeline.apply("Create1", Create.of(1))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     assertFalse(c.sideInput(view).iterator().hasNext());
                     c.output(1);
                   }
                 }));
 
-    // Pass at least one value through to guarantee that OldDoFn executes.
+    // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(results).containsInAnyOrder(1);
 
     pipeline.run();
@@ -438,8 +440,8 @@ public class ViewTest implements Serializable {
     PCollection<Integer> output =
         pipeline.apply("CreateMainInput", Create.of(29))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     Iterator<Integer> iterator = c.sideInput(view).iterator();
                     while (iterator.hasNext()) {
@@ -453,7 +455,7 @@ public class ViewTest implements Serializable {
                   }
                 }));
 
-    // Pass at least one value through to guarantee that OldDoFn executes.
+    // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(output).containsInAnyOrder(11);
 
     pipeline.run();
@@ -472,11 +474,11 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (Integer v : c.sideInput(view).get(c.element().substring(0, 1))) {
-                      c.output(KV.of(c.element(), v));
+                      c.output(of(c.element(), v));
                     }
                   }
                 }));
@@ -500,8 +502,8 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of(2 /* size */))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, KV<String, Integer>>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, KV<String, Integer>>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     assertEquals((int) c.element(), c.sideInput(view).size());
                     assertEquals((int) c.element(), c.sideInput(view).entrySet().size());
@@ -554,11 +556,11 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (Integer v : c.sideInput(view).get(c.element().substring(0, 1))) {
-                      c.output(KV.of(c.element(), v));
+                      c.output(of(c.element(), v));
                     }
                   }
                 }));
@@ -591,13 +593,13 @@ public class ViewTest implements Serializable {
                                               TimestampedValue.of("blackberry", new Instant(16))))
             .apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputSideInputs", ParDo.withSideInputs(view).of(
-                                           new OldDoFn<String, KV<String, Integer>>() {
-                                             @Override
+                                           new DoFn<String, KV<String, Integer>>() {
+                                             @ProcessElement
                                              public void processElement(ProcessContext c) {
                                                for (Integer v :
                                                    c.sideInput(view)
                                                        .get(c.element().substring(0, 1))) {
-                                                 c.output(KV.of(c.element(), v));
+                                                 c.output(of(c.element(), v));
                                                }
                                              }
                                            }));
@@ -629,8 +631,8 @@ public class ViewTest implements Serializable {
                                               TimestampedValue.of(1 /* size */, new Instant(16))))
             .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputSideInputs", ParDo.withSideInputs(view).of(
-                                           new OldDoFn<Integer, KV<String, Integer>>() {
-                                             @Override
+                                           new DoFn<Integer, KV<String, Integer>>() {
+                                             @ProcessElement
                                              public void processElement(ProcessContext c) {
                                                assertEquals((int) c.element(),
                                                    c.sideInput(view).size());
@@ -674,13 +676,13 @@ public class ViewTest implements Serializable {
                                               TimestampedValue.of("blackberry", new Instant(16))))
             .apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputSideInputs", ParDo.withSideInputs(view).of(
-                                           new OldDoFn<String, KV<String, Integer>>() {
-                                             @Override
+                                           new DoFn<String, KV<String, Integer>>() {
+                                             @ProcessElement
                                              public void processElement(ProcessContext c) {
                                                for (Integer v :
                                                    c.sideInput(view)
                                                        .get(c.element().substring(0, 1))) {
-                                                 c.output(KV.of(c.element(), v));
+                                                 c.output(of(c.element(), v));
                                                }
                                              }
                                            }));
@@ -704,8 +706,8 @@ public class ViewTest implements Serializable {
     PCollection<Integer> results =
         pipeline.apply("Create1", Create.of(1))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     assertTrue(c.sideInput(view).isEmpty());
                     assertTrue(c.sideInput(view).entrySet().isEmpty());
@@ -714,7 +716,7 @@ public class ViewTest implements Serializable {
                   }
                 }));
 
-    // Pass at least one value through to guarantee that OldDoFn executes.
+    // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(results).containsInAnyOrder(1);
 
     pipeline.run();
@@ -734,8 +736,8 @@ public class ViewTest implements Serializable {
     PCollection<Integer> results =
         pipeline.apply("Create1", Create.of(1))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     assertTrue(c.sideInput(view).isEmpty());
                     assertTrue(c.sideInput(view).entrySet().isEmpty());
@@ -744,7 +746,7 @@ public class ViewTest implements Serializable {
                   }
                 }));
 
-    // Pass at least one value through to guarantee that OldDoFn executes.
+    // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(results).containsInAnyOrder(1);
 
     pipeline.run();
@@ -763,8 +765,8 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     try {
                       c.sideInput(view).clear();
@@ -792,7 +794,7 @@ public class ViewTest implements Serializable {
                   }
                 }));
 
-    // Pass at least one value through to guarantee that OldDoFn executes.
+    // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(output).containsInAnyOrder(KV.of("apple", 1));
 
     pipeline.run();
@@ -811,11 +813,11 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(
-                        KV.of(c.element(), c.sideInput(view).get(c.element().substring(0, 1))));
+                        of(c.element(), c.sideInput(view).get(c.element().substring(0, 1))));
                   }
                 }));
 
@@ -838,8 +840,8 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of(2 /* size */))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, KV<String, Integer>>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, KV<String, Integer>>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     assertEquals((int) c.element(), c.sideInput(view).size());
                     assertEquals((int) c.element(), c.sideInput(view).entrySet().size());
@@ -870,11 +872,11 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(
-                        KV.of(c.element(), c.sideInput(view).get(c.element().substring(0, 1))));
+                        of(c.element(), c.sideInput(view).get(c.element().substring(0, 1))));
                   }
                 }));
 
@@ -906,8 +908,8 @@ public class ViewTest implements Serializable {
                                               TimestampedValue.of("blackberry", new Instant(16))))
             .apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputSideInputs", ParDo.withSideInputs(view).of(
-                                           new OldDoFn<String, KV<String, Integer>>() {
-                                             @Override
+                                           new DoFn<String, KV<String, Integer>>() {
+                                             @ProcessElement
                                              public void processElement(ProcessContext c) {
                                                c.output(KV.of(
                                                    c.element(),
@@ -943,8 +945,8 @@ public class ViewTest implements Serializable {
                                               TimestampedValue.of(1 /* size */, new Instant(16))))
             .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputSideInputs", ParDo.withSideInputs(view).of(
-                                           new OldDoFn<Integer, KV<String, Integer>>() {
-                                             @Override
+                                           new DoFn<Integer, KV<String, Integer>>() {
+                                             @ProcessElement
                                              public void processElement(ProcessContext c) {
                                                assertEquals((int) c.element(),
                                                    c.sideInput(view).size());
@@ -988,10 +990,10 @@ public class ViewTest implements Serializable {
                                               TimestampedValue.of("blackberry", new Instant(16))))
             .apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputSideInputs", ParDo.withSideInputs(view).of(
-                                           new OldDoFn<String, KV<String, Integer>>() {
-                                             @Override
+                                           new DoFn<String, KV<String, Integer>>() {
+                                             @ProcessElement
                                              public void processElement(ProcessContext c) {
-                                               c.output(KV.of(
+                                               c.output(of(
                                                    c.element(),
                                                    c.sideInput(view).get(
                                                        c.element().substring(0, 1))));
@@ -1017,8 +1019,8 @@ public class ViewTest implements Serializable {
     PCollection<Integer> results =
         pipeline.apply("Create1", Create.of(1))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     assertTrue(c.sideInput(view).isEmpty());
                     assertTrue(c.sideInput(view).entrySet().isEmpty());
@@ -1027,7 +1029,7 @@ public class ViewTest implements Serializable {
                   }
                 }));
 
-    // Pass at least one value through to guarantee that OldDoFn executes.
+    // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(results).containsInAnyOrder(1);
 
     pipeline.run();
@@ -1046,8 +1048,8 @@ public class ViewTest implements Serializable {
     PCollection<Integer> results =
         pipeline.apply("Create1", Create.of(1))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     assertTrue(c.sideInput(view).isEmpty());
                     assertTrue(c.sideInput(view).entrySet().isEmpty());
@@ -1056,7 +1058,7 @@ public class ViewTest implements Serializable {
                   }
                 }));
 
-    // Pass at least one value through to guarantee that OldDoFn executes.
+    // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(results).containsInAnyOrder(1);
 
     pipeline.run();
@@ -1080,8 +1082,8 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(
                         KV.of(c.element(), c.sideInput(view).get(c.element().substring(0, 1))));
@@ -1111,8 +1113,8 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     try {
                       c.sideInput(view).clear();
@@ -1139,7 +1141,7 @@ public class ViewTest implements Serializable {
                   }
                 }));
 
-    // Pass at least one value through to guarantee that OldDoFn executes.
+    // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(output).containsInAnyOrder(KV.of("apple", 1));
 
     pipeline.run();
@@ -1158,8 +1160,8 @@ public class ViewTest implements Serializable {
     PCollection<KV<String, Integer>> output =
         pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
             .apply("Output",
-                ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(KV
                         .of(c.element(), c.sideInput(view).get(c.element().substring(0, 1))));
@@ -1193,8 +1195,8 @@ public class ViewTest implements Serializable {
                                        TimestampedValue.of("C", new Instant(7))))
             .apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of(
-                                                  new OldDoFn<String, String>() {
-                                                    @Override
+                                                  new DoFn<String, String>() {
+                                                    @ProcessElement
                                                     public void processElement(ProcessContext c) {
                                                       c.output(c.element() + c.sideInput(view));
                                                     }
@@ -1226,8 +1228,8 @@ public class ViewTest implements Serializable {
                                        TimestampedValue.of("C", new Instant(7))))
             .apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of(
-                                                  new OldDoFn<String, String>() {
-                                                    @Override
+                                                  new DoFn<String, String>() {
+                                                    @ProcessElement
                                                     public void processElement(ProcessContext c) {
                                                       c.output(c.element() + c.sideInput(view));
                                                     }
@@ -1257,8 +1259,8 @@ public class ViewTest implements Serializable {
                                        TimestampedValue.of("C", new Instant(7))))
             .apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of(
-                                                  new OldDoFn<String, String>() {
-                                                    @Override
+                                                  new DoFn<String, String>() {
+                                                    @ProcessElement
                                                     public void processElement(ProcessContext c) {
                                                       c.output(c.element() + c.sideInput(view));
                                                     }
@@ -1287,8 +1289,8 @@ public class ViewTest implements Serializable {
         p.apply("CreateMainInput", Create.of(""))
             .apply(
                 "OutputMainAndSideInputs",
-                ParDo.withSideInputs(view).of(new OldDoFn<String, String>() {
-                  @Override
+                ParDo.withSideInputs(view).of(new DoFn<String, String>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(c.element() + c.sideInput(view));
                   }
@@ -1305,8 +1307,8 @@ public class ViewTest implements Serializable {
     Pipeline pipeline = TestPipeline.create();
     final PCollectionView<Iterable<Integer>> view1 =
         pipeline.apply("CreateVoid1", Create.of((Void) null).withCoder(VoidCoder.of()))
-            .apply("OutputOneInteger", ParDo.of(new OldDoFn<Void, Integer>() {
-              @Override
+            .apply("OutputOneInteger", ParDo.of(new DoFn<Void, Integer>() {
+              @ProcessElement
               public void processElement(ProcessContext c) {
                 c.output(17);
               }
@@ -1317,8 +1319,8 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateVoid2", Create.of((Void) null).withCoder(VoidCoder.of()))
             .apply(
                 "OutputSideInput",
-                ParDo.withSideInputs(view1).of(new OldDoFn<Void, Iterable<Integer>>() {
-                  @Override
+                ParDo.withSideInputs(view1).of(new DoFn<Void, Iterable<Integer>>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(c.sideInput(view1));
                   }
@@ -1328,8 +1330,8 @@ public class ViewTest implements Serializable {
     PCollection<Integer> output =
         pipeline.apply("CreateVoid3", Create.of((Void) null).withCoder(VoidCoder.of()))
             .apply("ReadIterableSideInput",
-                ParDo.withSideInputs(view2).of(new OldDoFn<Void, Integer>() {
-                  @Override
+                ParDo.withSideInputs(view2).of(new DoFn<Void, Integer>() {
+                  @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (Iterable<Integer> input : c.sideInput(view2)) {
                       for (Integer i : input) {


[12/13] incubator-beam git commit: Port Flink integration tests to new DoFn

Posted by ke...@apache.org.
Port Flink integration tests to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ae1f6d18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ae1f6d18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ae1f6d18

Branch: refs/heads/master
Commit: ae1f6d181ebe3c0bdffc35c833a6fdc858937d6c
Parents: 879f18f
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 12:17:20 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/runners/flink/ReadSourceITCase.java | 6 +++---
 .../apache/beam/runners/flink/ReadSourceStreamingITCase.java | 8 +++++---
 2 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ae1f6d18/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
index ca70096..516c7ba 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.flink;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -72,8 +72,8 @@ public class ReadSourceITCase extends JavaProgramTestBase {
 
     PCollection<String> result = p
         .apply(CountingInput.upTo(10))
-        .apply(ParDo.of(new OldDoFn<Long, String>() {
-          @Override
+        .apply(ParDo.of(new DoFn<Long, String>() {
+          @ProcessElement
           public void processElement(ProcessContext c) throws Exception {
             c.output(c.element().toString());
           }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ae1f6d18/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
index bc69f34..ea58d0d 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
@@ -20,9 +20,11 @@ package org.apache.beam.runners.flink;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+
 import com.google.common.base.Joiner;
+
 import org.apache.flink.streaming.util.StreamingProgramTestBase;
 
 /**
@@ -59,8 +61,8 @@ public class ReadSourceStreamingITCase extends StreamingProgramTestBase {
 
     p
       .apply(CountingInput.upTo(10))
-      .apply(ParDo.of(new OldDoFn<Long, String>() {
-          @Override
+      .apply(ParDo.of(new DoFn<Long, String>() {
+          @ProcessElement
           public void processElement(ProcessContext c) throws Exception {
             c.output(c.element().toString());
           }


[13/13] incubator-beam git commit: This closes #797

Posted by ke...@apache.org.
This closes #797


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/574c3777
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/574c3777
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/574c3777

Branch: refs/heads/master
Commit: 574c3777ddb6d9e8f5afb39e7ec14913ea52a8e8
Parents: bb00810 f5df358
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Aug 8 13:40:21 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 13:40:21 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/flink/examples/TFIDF.java      |  28 +--
 .../beam/runners/flink/examples/WordCount.java  |   5 +-
 .../flink/examples/streaming/AutoComplete.java  |  37 ++--
 .../flink/examples/streaming/JoinExamples.java  |  14 +-
 .../examples/streaming/KafkaIOExamples.java     |   7 +-
 .../KafkaWindowedWordCountExample.java          |  10 +-
 .../examples/streaming/WindowedWordCount.java   |  10 +-
 .../beam/runners/flink/ReadSourceITCase.java    |   6 +-
 .../flink/ReadSourceStreamingITCase.java        |   8 +-
 .../apache/beam/runners/spark/TfIdfTest.java    |  22 +--
 .../spark/translation/CombinePerKeyTest.java    |   6 +-
 .../translation/MultiOutputWordCountTest.java   |  10 +-
 .../spark/translation/SerializationTest.java    |  10 +-
 .../streaming/KafkaStreamingTest.java           |   6 +-
 .../main/java/org/apache/beam/sdk/io/Write.java |  26 +--
 .../org/apache/beam/sdk/transforms/Combine.java |  20 +-
 .../org/apache/beam/sdk/transforms/Filter.java  |   4 +-
 .../beam/sdk/transforms/windowing/Window.java   |   7 +-
 .../org/apache/beam/sdk/util/Reshuffle.java     |   6 +-
 .../java/org/apache/beam/sdk/io/WriteTest.java  |  22 ++-
 .../apache/beam/sdk/transforms/DoFnTest.java    |  29 +--
 .../apache/beam/sdk/transforms/ViewTest.java    | 192 ++++++++++---------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  21 +-
 .../sdk/transforms/WithTimestampsJava8Test.java |   6 +-
 24 files changed, 248 insertions(+), 264 deletions(-)
----------------------------------------------------------------------



[05/13] incubator-beam git commit: Port Reshuffle to new DoFn

Posted by ke...@apache.org.
Port Reshuffle to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ecf21a5c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ecf21a5c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ecf21a5c

Branch: refs/heads/master
Commit: ecf21a5cc177c39e515e4c78e16b579ac298c999
Parents: d798413
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 11:47:23 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700

----------------------------------------------------------------------
 .../core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ecf21a5c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
index 66c7cc0..ad33a25 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.sdk.util;
 
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -70,8 +70,8 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti
         // set allowed lateness.
         .setWindowingStrategyInternal(originalStrategy)
         .apply("ExpandIterable", ParDo.of(
-            new OldDoFn<KV<K, Iterable<V>>, KV<K, V>>() {
-              @Override
+            new DoFn<KV<K, Iterable<V>>, KV<K, V>>() {
+              @ProcessElement
               public void processElement(ProcessContext c) {
                 K key = c.element().getKey();
                 for (V value : c.element().getValue()) {


[02/13] incubator-beam git commit: Port BigQueryIO to new DoFn

Posted by ke...@apache.org.
Port BigQueryIO to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d6395e9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d6395e9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d6395e9d

Branch: refs/heads/master
Commit: d6395e9d45dcbeb9b3d3e2f8214a49866622b9cf
Parents: 87313f1
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 12:26:53 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 21 ++++++++++----------
 1 file changed, 10 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6395e9d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index ed2c32e..36e09f1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -44,7 +44,6 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -1785,7 +1784,7 @@ public class BigQueryIO {
         return PDone.in(input.getPipeline());
       }
 
-      private class WriteBundles extends OldDoFn<TableRow, KV<String, Long>> {
+      private class WriteBundles extends DoFn<TableRow, KV<String, Long>> {
         private TableRowWriter writer = null;
         private final String tempFilePrefix;
 
@@ -1793,7 +1792,7 @@ public class BigQueryIO {
           this.tempFilePrefix = tempFilePrefix;
         }
 
-        @Override
+        @ProcessElement
         public void processElement(ProcessContext c) throws Exception {
           if (writer == null) {
             writer = new TableRowWriter(tempFilePrefix);
@@ -1806,7 +1805,7 @@ public class BigQueryIO {
             // Discard write result and close the write.
             try {
               writer.close();
-              // The writer does not need to be reset, as this OldDoFn cannot be reused.
+              // The writer does not need to be reset, as this DoFn cannot be reused.
             } catch (Exception closeException) {
               // Do not mask the exception that caused the write to fail.
               e.addSuppressed(closeException);
@@ -1815,7 +1814,7 @@ public class BigQueryIO {
           }
         }
 
-        @Override
+        @FinishBundle
         public void finishBundle(Context c) throws Exception {
           if (writer != null) {
             c.output(writer.close());
@@ -1959,7 +1958,7 @@ public class BigQueryIO {
     /**
      * Partitions temporary files based on number of files and file sizes.
      */
-    static class WritePartition extends OldDoFn<String, KV<Long, List<String>>> {
+    static class WritePartition extends DoFn<String, KV<Long, List<String>>> {
       private final PCollectionView<Iterable<KV<String, Long>>> resultsView;
       private TupleTag<KV<Long, List<String>>> multiPartitionsTag;
       private TupleTag<KV<Long, List<String>>> singlePartitionTag;
@@ -1973,7 +1972,7 @@ public class BigQueryIO {
         this.singlePartitionTag = singlePartitionTag;
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         List<KV<String, Long>> results = Lists.newArrayList(c.sideInput(resultsView));
         if (results.isEmpty()) {
@@ -2015,7 +2014,7 @@ public class BigQueryIO {
     /**
      * Writes partitions to BigQuery tables.
      */
-    static class WriteTables extends OldDoFn<KV<Long, Iterable<List<String>>>, String> {
+    static class WriteTables extends DoFn<KV<Long, Iterable<List<String>>>, String> {
       private final boolean singlePartition;
       private final BigQueryServices bqServices;
       private final String jobIdToken;
@@ -2044,7 +2043,7 @@ public class BigQueryIO {
         this.createDisposition = createDisposition;
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         List<String> partition = Lists.newArrayList(c.element().getValue()).get(0);
         String jobIdPrefix = String.format(jobIdToken + "_%05d", c.element().getKey());
@@ -2149,7 +2148,7 @@ public class BigQueryIO {
     /**
      * Copies temporary tables to destination table.
      */
-    static class WriteRename extends OldDoFn<String, Void> {
+    static class WriteRename extends DoFn<String, Void> {
       private final BigQueryServices bqServices;
       private final String jobIdToken;
       private final String jsonTableRef;
@@ -2172,7 +2171,7 @@ public class BigQueryIO {
         this.tempTablesView = tempTablesView;
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         List<String> tempTablesJson = Lists.newArrayList(c.sideInput(tempTablesView));
 


[08/13] incubator-beam git commit: Remove references to OldDoFn from DoFnTest

Posted by ke...@apache.org.
Remove references to OldDoFn from DoFnTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/32f84bb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/32f84bb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/32f84bb4

Branch: refs/heads/master
Commit: 32f84bb4d442a9b2060a4a5fbdc77d14d6b0976b
Parents: 86291de
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 11:54:57 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/DoFnTest.java    | 29 +-------------------
 1 file changed, 1 insertion(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32f84bb4/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
index c7e8972..710e4ce 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
@@ -22,10 +22,6 @@ import static org.hamcrest.Matchers.isA;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.testing.NeedsRunner;
@@ -140,29 +136,6 @@ public class DoFnTest implements Serializable {
   }
 
   @Test
-  public void testDoFnWithContextUsingAggregators() {
-    NoOpOldDoFn<Object, Object> noOpFn = new NoOpOldDoFn<>();
-    OldDoFn<Object, Object>.Context context = noOpFn.context();
-
-    OldDoFn<Object, Object> fn = spy(noOpFn);
-    context = spy(context);
-
-    @SuppressWarnings("unchecked")
-    Aggregator<Long, Long> agg = mock(Aggregator.class);
-
-    Sum.SumLongFn combiner = new Sum.SumLongFn();
-    Aggregator<Long, Long> delegateAggregator =
-        fn.createAggregator("test", combiner);
-
-    when(context.createAggregatorInternal("test", combiner)).thenReturn(agg);
-
-    context.setupDelegateAggregators();
-    delegateAggregator.addValue(1L);
-
-    verify(agg).addValue(1L);
-  }
-
-  @Test
   public void testDefaultPopulateDisplayDataImplementation() {
     DoFn<String, String> fn = new DoFn<String, String>() {
     };
@@ -225,7 +198,7 @@ public class DoFnTest implements Serializable {
   }
 
   /**
-   * Initialize a test pipeline with the specified {@link OldDoFn}.
+   * Initialize a test pipeline with the specified {@link DoFn}.
    */
   private <InputT, OutputT> TestPipeline createTestPipeline(DoFn<InputT, OutputT> fn) {
     TestPipeline pipeline = TestPipeline.create();


[03/13] incubator-beam git commit: Port most of Combine to new DoFn

Posted by ke...@apache.org.
Port most of Combine to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/331f5234
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/331f5234
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/331f5234

Branch: refs/heads/master
Commit: 331f523461094af666a20bd97e1e15f1dec3feba
Parents: b1db02d
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 12:11:11 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/Combine.java | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/331f5234/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 6fc2324..a825800 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -1473,9 +1473,9 @@ public class Combine {
       PCollection<OutputT> defaultIfEmpty = maybeEmpty.getPipeline()
           .apply("CreateVoid", Create.of((Void) null).withCoder(VoidCoder.of()))
           .apply("ProduceDefault", ParDo.withSideInputs(maybeEmptyView).of(
-              new OldDoFn<Void, OutputT>() {
-                @Override
-                public void processElement(OldDoFn<Void, OutputT>.ProcessContext c) {
+              new DoFn<Void, OutputT>() {
+                @ProcessElement
+                public void processElement(ProcessContext c) {
                   Iterator<OutputT> combined = c.sideInput(maybeEmptyView).iterator();
                   if (!combined.hasNext()) {
                     c.output(defaultValue);
@@ -2097,15 +2097,15 @@ public class Combine {
       final TupleTag<KV<KV<K, Integer>, InputT>> hot = new TupleTag<>();
       final TupleTag<KV<K, InputT>> cold = new TupleTag<>();
       PCollectionTuple split = input.apply("AddNonce", ParDo.of(
-          new OldDoFn<KV<K, InputT>, KV<K, InputT>>() {
+          new DoFn<KV<K, InputT>, KV<K, InputT>>() {
             transient int counter;
-            @Override
+            @StartBundle
             public void startBundle(Context c) {
               counter = ThreadLocalRandom.current().nextInt(
                   Integer.MAX_VALUE);
             }
 
-            @Override
+            @ProcessElement
             public void processElement(ProcessContext c) {
               KV<K, InputT> kv = c.element();
               int spread = Math.max(1, hotKeyFanout.apply(kv.getKey()));
@@ -2135,9 +2135,9 @@ public class Combine {
           .setWindowingStrategyInternal(preCombineStrategy)
           .apply("PreCombineHot", Combine.perKey(hotPreCombine))
           .apply("StripNonce", ParDo.of(
-              new OldDoFn<KV<KV<K, Integer>, AccumT>,
+              new DoFn<KV<KV<K, Integer>, AccumT>,
                                      KV<K, InputOrAccum<InputT, AccumT>>>() {
-                @Override
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   c.output(KV.of(
                       c.element().getKey().getKey(),
@@ -2151,8 +2151,8 @@ public class Combine {
           .get(cold)
           .setCoder(inputCoder)
           .apply("PrepareCold", ParDo.of(
-              new OldDoFn<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>() {
-                @Override
+              new DoFn<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>() {
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   c.output(KV.of(c.element().getKey(),
                                  InputOrAccum.<InputT, AccumT>input(c.element().getValue())));