You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/08 20:40:36 UTC
[01/13] incubator-beam git commit: Port various Spark runner tests to
new DoFn
Repository: incubator-beam
Updated Branches:
refs/heads/master bb00810ad -> 574c3777d
Port various Spark runner tests to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f5df3583
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f5df3583
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f5df3583
Branch: refs/heads/master
Commit: f5df358320cfde6a1c4d012d4169af691f6a18e9
Parents: d6395e9
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 12:31:07 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700
----------------------------------------------------------------------
.../apache/beam/runners/spark/TfIdfTest.java | 22 ++++++++++----------
.../spark/translation/CombinePerKeyTest.java | 6 +++---
.../translation/MultiOutputWordCountTest.java | 10 ++++-----
.../spark/translation/SerializationTest.java | 10 ++++-----
.../streaming/KafkaStreamingTest.java | 6 +++---
5 files changed, 27 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5df3583/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
index 074e6aa..17bf6dd 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
@@ -24,8 +24,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Keys;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
@@ -101,8 +101,8 @@ public class TfIdfTest {
// of the words in the document associated with that that URI.
PCollection<KV<URI, String>> uriToWords = uriToContent
.apply("SplitWords", ParDo.of(
- new OldDoFn<KV<URI, String>, KV<URI, String>>() {
- @Override
+ new DoFn<KV<URI, String>, KV<URI, String>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
URI uri = c.element().getKey();
String line = c.element().getValue();
@@ -144,8 +144,8 @@ public class TfIdfTest {
// by the URI key.
PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
.apply("ShiftKeys", ParDo.of(
- new OldDoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
- @Override
+ new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
URI uri = c.element().getKey().getKey();
String word = c.element().getKey().getValue();
@@ -183,8 +183,8 @@ public class TfIdfTest {
// divided by the total number of words in the document.
PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
.apply("ComputeTermFrequencies", ParDo.of(
- new OldDoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
- @Override
+ new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
URI uri = c.element().getKey();
Long wordTotal = c.element().getValue().getOnly(wordTotalsTag);
@@ -208,8 +208,8 @@ public class TfIdfTest {
PCollection<KV<String, Double>> wordToDf = wordToDocCount
.apply("ComputeDocFrequencies", ParDo
.withSideInputs(totalDocuments)
- .of(new OldDoFn<KV<String, Long>, KV<String, Double>>() {
- @Override
+ .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
String word = c.element().getKey();
Long documentCount = c.element().getValue();
@@ -237,8 +237,8 @@ public class TfIdfTest {
// divided by the log of the document frequency.
return wordToUriAndTfAndDf
.apply("ComputeTfIdf", ParDo.of(
- new OldDoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
- @Override
+ new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
String word = c.element().getKey();
Double df = c.element().getValue().getOnly(dfTag);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5df3583/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
index dee9213..cdf2cfb 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
@@ -70,8 +70,8 @@ public class CombinePerKeyTest {
private static class SumPerKey<T> extends PTransform<PCollection<T>, PCollection<KV<T, Long>>> {
@Override
public PCollection<KV<T, Long>> apply(PCollection<T> pcol) {
- PCollection<KV<T, Long>> withLongs = pcol.apply(ParDo.of(new OldDoFn<T, KV<T, Long>>() {
- @Override
+ PCollection<KV<T, Long>> withLongs = pcol.apply(ParDo.of(new DoFn<T, KV<T, Long>>() {
+ @ProcessElement
public void processElement(ProcessContext processContext) throws Exception {
processContext.output(KV.of(processContext.element(), 1L));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5df3583/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
index 066521b..291f7b2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
@@ -30,9 +30,9 @@ import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.ApproximateUnique;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
@@ -105,7 +105,7 @@ public class MultiOutputWordCountTest {
/**
* A OldDoFn that tokenizes lines of text into individual words.
*/
- static class ExtractWordsFn extends OldDoFn<String, String> {
+ static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Integer, Integer> totalWords = createAggregator("totalWords",
new Sum.SumIntegerFn());
@@ -117,7 +117,7 @@ public class MultiOutputWordCountTest {
this.regex = regex;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
String[] words = c.element().split(c.sideInput(regex));
for (String word : words) {
@@ -170,8 +170,8 @@ public class MultiOutputWordCountTest {
}
}
- private static class FormatCountsFn extends OldDoFn<KV<String, Long>, String> {
- @Override
+ private static class FormatCountsFn extends DoFn<KV<String, Long>, String> {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().getKey() + ": " + c.element().getValue());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5df3583/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
index fb97b8b..019b107 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
@@ -147,12 +147,12 @@ public class SerializationTest {
/**
* A OldDoFn that tokenizes lines of text into individual words.
*/
- static class ExtractWordsFn extends OldDoFn<StringHolder, StringHolder> {
+ static class ExtractWordsFn extends DoFn<StringHolder, StringHolder> {
private static final Pattern WORD_BOUNDARY = Pattern.compile("[^a-zA-Z']+");
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
// Split the line into words.
String[] words = WORD_BOUNDARY.split(c.element().toString());
@@ -175,8 +175,8 @@ public class SerializationTest {
/**
* A OldDoFn that converts a Word and Count into a printable string.
*/
- private static class FormatCountsFn extends OldDoFn<KV<StringHolder, Long>, StringHolder> {
- @Override
+ private static class FormatCountsFn extends DoFn<KV<StringHolder, Long>, StringHolder> {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(new StringHolder(c.element().getKey() + ": " + c.element().getValue()));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5df3583/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index fa98ca3..17044aa 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -121,8 +121,8 @@ public class KafkaStreamingTest {
EMBEDDED_ZOOKEEPER.shutdown();
}
- private static class FormatKVFn extends OldDoFn<KV<String, String>, String> {
- @Override
+ private static class FormatKVFn extends DoFn<KV<String, String>, String> {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().getKey() + "," + c.element().getValue());
}
[09/13] incubator-beam git commit: Port Java 8 tests to new DoFn
Posted by ke...@apache.org.
Port Java 8 tests to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/879f18fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/879f18fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/879f18fd
Branch: refs/heads/master
Commit: 879f18fd1694b0540e3e695416566f24220ecb1e
Parents: 331f523
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 12:12:12 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/WithTimestampsJava8Test.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/879f18fd/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java
index 1141e88..03aa647 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java
@@ -47,9 +47,9 @@ public class WithTimestampsJava8Test implements Serializable {
.apply(WithTimestamps.of((String input) -> new Instant(Long.valueOf(yearTwoThousand))));
PCollection<KV<String, Instant>> timestampedVals =
- timestamped.apply(ParDo.of(new OldDoFn<String, KV<String, Instant>>() {
- @Override
- public void processElement(OldDoFn<String, KV<String, Instant>>.ProcessContext c)
+ timestamped.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c)
throws Exception {
c.output(KV.of(c.element(), c.timestamp()));
}
[06/13] incubator-beam git commit: Port Window transform to new DoFn
Posted by ke...@apache.org.
Port Window transform to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2c6aaf73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2c6aaf73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2c6aaf73
Branch: refs/heads/master
Commit: 2c6aaf730353c4db12aea60fd89851bddec0415c
Parents: ecf21a5
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 11:47:57 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/transforms/windowing/Window.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c6aaf73/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 5b6f4c8..c1b0237 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -21,8 +21,8 @@ import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -645,8 +645,9 @@ public class Window {
// We first apply a (trivial) transform to the input PCollection to produce a new
// PCollection. This ensures that we don't modify the windowing strategy of the input
// which may be used elsewhere.
- .apply("Identity", ParDo.of(new OldDoFn<T, T>() {
- @Override public void processElement(ProcessContext c) {
+ .apply("Identity", ParDo.of(new DoFn<T, T>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
c.output(c.element());
}
}))
[10/13] incubator-beam git commit: Port Flink fork of examples to new
DoFn
Posted by ke...@apache.org.
Port Flink fork of examples to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/87313f1c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/87313f1c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/87313f1c
Branch: refs/heads/master
Commit: 87313f1c3d8cf874e04aaf528161478afa030f38
Parents: ae1f6d1
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 12:24:24 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700
----------------------------------------------------------------------
.../beam/runners/flink/examples/TFIDF.java | 28 +++++++--------
.../beam/runners/flink/examples/WordCount.java | 5 +--
.../flink/examples/streaming/AutoComplete.java | 37 ++++++++++----------
.../flink/examples/streaming/JoinExamples.java | 14 ++++----
.../examples/streaming/KafkaIOExamples.java | 7 ++--
.../KafkaWindowedWordCountExample.java | 10 +++---
.../examples/streaming/WindowedWordCount.java | 10 +++---
7 files changed, 57 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index 716c8ad..4deca12 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.PTransform;
@@ -230,10 +230,10 @@ public class TFIDF {
// Create a collection of pairs mapping a URI to each
// of the words in the document associated with that that URI.
PCollection<KV<URI, String>> uriToWords = uriToContent
- .apply("SplitWords", ParDo.of(new OldDoFn<KV<URI, String>, KV<URI, String>>() {
+ .apply("SplitWords", ParDo.of(new DoFn<KV<URI, String>, KV<URI, String>>() {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
URI uri = c.element().getKey();
String line = c.element().getValue();
@@ -275,10 +275,10 @@ public class TFIDF {
// by the URI key.
PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
.apply("ShiftKeys", ParDo.of(
- new OldDoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
+ new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
URI uri = c.element().getKey().getKey();
String word = c.element().getKey().getValue();
@@ -316,10 +316,10 @@ public class TFIDF {
// divided by the total number of words in the document.
PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
.apply("ComputeTermFrequencies", ParDo.of(
- new OldDoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+ new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
URI uri = c.element().getKey();
Long wordTotal = c.element().getValue().getOnly(wordTotalsTag);
@@ -339,14 +339,14 @@ public class TFIDF {
// documents in which the word appears divided by the total
// number of documents in the corpus. Note how the total number of
// documents is passed as a side input; the same value is
- // presented to each invocation of the OldDoFn.
+ // presented to each invocation of the DoFn.
PCollection<KV<String, Double>> wordToDf = wordToDocCount
.apply("ComputeDocFrequencies", ParDo
.withSideInputs(totalDocuments)
- .of(new OldDoFn<KV<String, Long>, KV<String, Double>>() {
+ .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
String word = c.element().getKey();
Long documentCount = c.element().getValue();
@@ -375,10 +375,10 @@ public class TFIDF {
return wordToUriAndTfAndDf
.apply("ComputeTfIdf", ParDo.of(
- new OldDoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+ new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
String word = c.element().getKey();
Double df = c.element().getValue().getOnly(dfTag);
@@ -416,10 +416,10 @@ public class TFIDF {
@Override
public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
return wordToUriAndTfIdf
- .apply("Format", ParDo.of(new OldDoFn<KV<String, KV<URI, Double>>, String>() {
+ .apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(String.format("%s,\t%s,\t%f",
c.element().getKey(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
index 080cdc9..fdffd39 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
@@ -38,11 +39,11 @@ import org.apache.beam.sdk.values.PCollection;
public class WordCount {
- public static class ExtractWordsFn extends OldDoFn<String, String> {
+ public static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
emptyLines.addValue(1L);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index 068404a..aff1a35 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -92,10 +93,10 @@ public class AutoComplete {
// Map the KV outputs of Count into our own CompletionCandiate class.
.apply("CreateCompletionCandidates", ParDo.of(
- new OldDoFn<KV<String, Long>, CompletionCandidate>() {
+ new DoFn<KV<String, Long>, CompletionCandidate>() {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
CompletionCandidate cand = new CompletionCandidate(c.element().getKey(), c.element().getValue());
c.output(cand);
@@ -182,10 +183,10 @@ public class AutoComplete {
}
private static class FlattenTops
- extends OldDoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
+ extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
for (CompletionCandidate cc : c.element().getValue()) {
c.output(cc);
@@ -236,10 +237,10 @@ public class AutoComplete {
}
/**
- * A OldDoFn that keys each candidate by all its prefixes.
+ * A DoFn that keys each candidate by all its prefixes.
*/
private static class AllPrefixes
- extends OldDoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
+ extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> {
private static final long serialVersionUID = 0;
private final int minPrefix;
@@ -251,7 +252,7 @@ public class AutoComplete {
this.minPrefix = minPrefix;
this.maxPrefix = maxPrefix;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
String word = c.element().value;
for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
@@ -314,11 +315,11 @@ public class AutoComplete {
}
}
- static class ExtractWordsFn extends OldDoFn<String, String> {
+ static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
emptyLines.addValue(1L);
@@ -337,21 +338,21 @@ public class AutoComplete {
}
/**
- * Takes as input a the top candidates per prefix, and emits an entity
- * suitable for writing to Datastore.
+ * Takes as input a the top candidates per prefix, and emits an entity suitable for writing to
+ * Datastore.
*/
- static class FormatForPerTaskLocalFile extends OldDoFn<KV<String, List<CompletionCandidate>>, String>
- implements OldDoFn.RequiresWindowAccess{
+ static class FormatForPerTaskLocalFile
+ extends DoFn<KV<String, List<CompletionCandidate>>, String> {
private static final long serialVersionUID = 0;
- @Override
- public void processElement(ProcessContext c) {
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) {
StringBuilder str = new StringBuilder();
KV<String, List<CompletionCandidate>> elem = c.element();
- str.append(elem.getKey() +" @ "+ c.window() +" -> ");
- for(CompletionCandidate cand: elem.getValue()) {
+ str.append(elem.getKey() +" @ "+ window +" -> ");
+ for (CompletionCandidate cand: elem.getValue()) {
str.append(cand.toString() + " ");
}
System.out.println(str.toString());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
index 7d7c0c7..458a263 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
@@ -23,7 +23,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
@@ -76,10 +76,10 @@ public class JoinExamples {
// country code 'key' -> string of <event info>, <country name>
PCollection<KV<String, String>> finalResultCollection =
kvpCollection.apply("Process", ParDo.of(
- new OldDoFn<KV<String, CoGbkResult>, KV<String, String>>() {
+ new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
KV<String, CoGbkResult> e = c.element();
String key = e.getKey();
@@ -98,10 +98,10 @@ public class JoinExamples {
}));
return finalResultCollection
- .apply("Format", ParDo.of(new OldDoFn<KV<String, String>, String>() {
+ .apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
String result = c.element().getKey() + " -> " + c.element().getValue();
System.out.println(result);
@@ -110,10 +110,10 @@ public class JoinExamples {
}));
}
- static class ExtractEventDataFn extends OldDoFn<String, KV<String, String>> {
+ static class ExtractEventDataFn extends DoFn<String, KV<String, String>> {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
String line = c.element().toLowerCase();
String key = line.split("\\s")[0];
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
index 395b409..68a9edc 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
@@ -30,9 +30,10 @@ import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
+
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
@@ -326,9 +327,9 @@ public class KafkaIOExamples {
* Print contents to stdout
* @param <T> type of the input
*/
- private static class PrintFn<T> extends OldDoFn<T, T> {
+ private static class PrintFn<T> extends DoFn<T, T> {
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
System.out.println(c.element().toString());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
index 8c31783..39ce225 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
@@ -49,11 +49,11 @@ public class KafkaWindowedWordCountExample {
static final String GROUP_ID = "myGroup"; // Default groupId
static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect to for Kafka
- public static class ExtractWordsFn extends OldDoFn<String, String> {
+ public static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
emptyLines.addValue(1L);
@@ -71,8 +71,8 @@ public class KafkaWindowedWordCountExample {
}
}
- public static class FormatAsStringFn extends OldDoFn<KV<String, Long>, String> {
- @Override
+ public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+ @ProcessElement
public void processElement(ProcessContext c) {
String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
System.out.println(row);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
index d149e4e..fe8e627 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
@@ -59,19 +59,19 @@ public class WindowedWordCount {
static final long WINDOW_SIZE = 10; // Default window duration in seconds
static final long SLIDE_SIZE = 5; // Default window slide in seconds
- static class FormatAsStringFn extends OldDoFn<KV<String, Long>, String> {
- @Override
+ static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+ @ProcessElement
public void processElement(ProcessContext c) {
String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
c.output(row);
}
}
- static class ExtractWordsFn extends OldDoFn<String, String> {
+ static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
emptyLines.addValue(1L);
[07/13] incubator-beam git commit: Port Write to new DoFn
Posted by ke...@apache.org.
Port Write to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/86291de3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/86291de3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/86291de3
Branch: refs/heads/master
Commit: 86291de39772765f4d6d404ac8a8430d8ad8a15f
Parents: 2c6aaf7
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 11:49:37 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700
----------------------------------------------------------------------
.../main/java/org/apache/beam/sdk/io/Write.java | 26 ++++++++++----------
.../java/org/apache/beam/sdk/io/WriteTest.java | 22 ++++++++++-------
2 files changed, 26 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/86291de3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index 3e997b0..a846b7c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -27,8 +27,8 @@ import org.apache.beam.sdk.io.Sink.WriteOperation;
import org.apache.beam.sdk.io.Sink.Writer;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -156,7 +156,7 @@ public class Write {
* Writes all the elements in a bundle using a {@link Writer} produced by the
* {@link WriteOperation} associated with the {@link Sink}.
*/
- private class WriteBundles<WriteT> extends OldDoFn<T, WriteT> {
+ private class WriteBundles<WriteT> extends DoFn<T, WriteT> {
// Writer that will write the records in this bundle. Lazily
// initialized in processElement.
private Writer<T, WriteT> writer = null;
@@ -166,7 +166,7 @@ public class Write {
this.writeOperationView = writeOperationView;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
// Lazily initialize the Writer
if (writer == null) {
@@ -182,7 +182,7 @@ public class Write {
// Discard write result and close the write.
try {
writer.close();
- // The writer does not need to be reset, as this OldDoFn cannot be reused.
+ // The writer does not need to be reset, as this DoFn cannot be reused.
} catch (Exception closeException) {
if (closeException instanceof InterruptedException) {
// Do not silently ignore interrupted state.
@@ -195,7 +195,7 @@ public class Write {
}
}
- @Override
+ @FinishBundle
public void finishBundle(Context c) throws Exception {
if (writer != null) {
WriteT result = writer.close();
@@ -217,14 +217,14 @@ public class Write {
*
* @see WriteBundles
*/
- private class WriteShardedBundles<WriteT> extends OldDoFn<KV<Integer, Iterable<T>>, WriteT> {
+ private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> {
private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
this.writeOperationView = writeOperationView;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
// In a sharded write, single input element represents one shard. We can open and close
// the writer in each call to processElement.
@@ -296,8 +296,8 @@ public class Write {
* <p>This singleton collection containing the WriteOperation is then used as a side input to a
* ParDo over the PCollection of elements to write. In this bundle-writing phase,
* {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
- * {@link Writer#open} and {@link Writer#close} are called in {@link OldDoFn#startBundle} and
- * {@link OldDoFn#finishBundle}, respectively, and {@link Writer#write} method is called for
+ * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and
+ * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for
* every element in the bundle. The output of this ParDo is a PCollection of
* <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for
* each bundle.
@@ -334,8 +334,8 @@ public class Write {
// Initialize the resource in a do-once ParDo on the WriteOperation.
operationCollection = operationCollection
.apply("Initialize", ParDo.of(
- new OldDoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
- @Override
+ new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
WriteOperation<T, WriteT> writeOperation = c.element();
LOG.info("Initializing write operation {}", writeOperation);
@@ -388,8 +388,8 @@ public class Write {
// ParDo. There is a dependency between this ParDo and the parallel write (the writer results
// collection as a side input), so it will happen after the parallel write.
operationCollection
- .apply("Finalize", ParDo.of(new OldDoFn<WriteOperation<T, WriteT>, Integer>() {
- @Override
+ .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
WriteOperation<T, WriteT> writeOperation = c.element();
LOG.info("Finalizing write operation {}.", writeOperation);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/86291de3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 4b6e749..705b77c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
+import static org.apache.beam.sdk.values.KV.of;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -29,6 +30,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static java.util.concurrent.ThreadLocalRandom.current;
+
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
@@ -41,9 +44,9 @@ import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOption
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -73,7 +76,6 @@ import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -102,16 +104,18 @@ public class WriteTest {
this.window = window;
}
- private static class AddArbitraryKey<T> extends OldDoFn<T, KV<Integer, T>> {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element()));
+ private static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> {
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(of(current().nextInt(), c.element()));
}
}
- private static class RemoveArbitraryKey<T> extends OldDoFn<KV<Integer, Iterable<T>>, T> {
- @Override
- public void processElement(ProcessContext c) throws Exception {
+ private static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> {
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
for (T s : c.element().getValue()) {
c.output(s);
}
[04/13] incubator-beam git commit: Port Filter to the new DoFn
Posted by ke...@apache.org.
Port Filter to the new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d798413b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d798413b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d798413b
Branch: refs/heads/master
Commit: d798413be41fa5941d12049d899aa6ad970b8515
Parents: 7629f97
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 11:46:53 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/beam/sdk/transforms/Filter.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d798413b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
index 37cbec1..2d9bdee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
@@ -202,8 +202,8 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
@Override
public PCollection<T> apply(PCollection<T> input) {
- return input.apply(ParDo.of(new OldDoFn<T, T>() {
- @Override
+ return input.apply(ParDo.of(new DoFn<T, T>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
if (predicate.apply(c.element())) {
c.output(c.element());
[11/13] incubator-beam git commit: Port ViewTest to new DoFn
Posted by ke...@apache.org.
Port ViewTest to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b1db02d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b1db02d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b1db02d2
Branch: refs/heads/master
Commit: b1db02d23f9454ff1a169d0aa81552e8dbe59fe3
Parents: 32f84bb
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 12:07:28 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/ViewTest.java | 192 ++++++++++---------
1 file changed, 97 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1db02d2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index ee240bf..170e6ce 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.transforms;
+import static org.apache.beam.sdk.values.KV.of;
+
import static com.google.common.base.Preconditions.checkArgument;
import static org.hamcrest.Matchers.isA;
@@ -100,8 +102,8 @@ public class ViewTest implements Serializable {
PCollection<Integer> output =
pipeline.apply("Create123", Create.of(1, 2, 3))
.apply("OutputSideInputs",
- ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.sideInput(view));
}
@@ -131,8 +133,8 @@ public class ViewTest implements Serializable {
TimestampedValue.of(3, new Instant(12))))
.apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
.apply("OutputSideInputs",
- ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.sideInput(view));
}
@@ -153,8 +155,8 @@ public class ViewTest implements Serializable {
.apply(View.<Integer>asSingleton());
pipeline.apply("Create123", Create.of(1, 2, 3))
- .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
- @Override
+ .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.sideInput(view));
}
@@ -178,8 +180,8 @@ public class ViewTest implements Serializable {
final PCollectionView<Integer> view = oneTwoThree.apply(View.<Integer>asSingleton());
oneTwoThree.apply(
- "OutputSideInputs", ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
- @Override
+ "OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.sideInput(view));
}
@@ -205,8 +207,8 @@ public class ViewTest implements Serializable {
PCollection<Integer> output =
pipeline.apply("CreateMainInput", Create.of(29, 31))
.apply("OutputSideInputs",
- ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
checkArgument(c.sideInput(view).size() == 4);
checkArgument(c.sideInput(view).get(0) == c.sideInput(view).get(0));
@@ -246,8 +248,8 @@ public class ViewTest implements Serializable {
.apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
.apply(
"OutputSideInputs",
- ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
checkArgument(c.sideInput(view).size() == 4);
checkArgument(c.sideInput(view).get(0) == c.sideInput(view).get(0));
@@ -274,8 +276,8 @@ public class ViewTest implements Serializable {
PCollection<Integer> results =
pipeline.apply("Create1", Create.of(1))
.apply("OutputSideInputs",
- ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
assertTrue(c.sideInput(view).isEmpty());
assertFalse(c.sideInput(view).iterator().hasNext());
@@ -283,7 +285,7 @@ public class ViewTest implements Serializable {
}
}));
- // Pass at least one value through to guarantee that OldDoFn executes.
+ // Pass at least one value through to guarantee that DoFn executes.
PAssert.that(results).containsInAnyOrder(1);
pipeline.run();
@@ -300,8 +302,8 @@ public class ViewTest implements Serializable {
PCollection<Integer> output =
pipeline.apply("CreateMainInput", Create.of(29))
.apply("OutputSideInputs",
- ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
try {
c.sideInput(view).clear();
@@ -329,7 +331,7 @@ public class ViewTest implements Serializable {
}
}));
- // Pass at least one value through to guarantee that OldDoFn executes.
+ // Pass at least one value through to guarantee that DoFn executes.
PAssert.that(output).containsInAnyOrder(11);
pipeline.run();
@@ -347,8 +349,8 @@ public class ViewTest implements Serializable {
PCollection<Integer> output =
pipeline.apply("CreateMainInput", Create.of(29, 31))
.apply("OutputSideInputs",
- ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
for (Integer i : c.sideInput(view)) {
c.output(i);
@@ -387,8 +389,8 @@ public class ViewTest implements Serializable {
TimestampedValue.of(35, new Instant(11))))
.apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
.apply("OutputSideInputs",
- ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
for (Integer i : c.sideInput(view)) {
c.output(i);
@@ -413,15 +415,15 @@ public class ViewTest implements Serializable {
PCollection<Integer> results =
pipeline.apply("Create1", Create.of(1))
.apply("OutputSideInputs",
- ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
assertFalse(c.sideInput(view).iterator().hasNext());
c.output(1);
}
}));
- // Pass at least one value through to guarantee that OldDoFn executes.
+ // Pass at least one value through to guarantee that DoFn executes.
PAssert.that(results).containsInAnyOrder(1);
pipeline.run();
@@ -438,8 +440,8 @@ public class ViewTest implements Serializable {
PCollection<Integer> output =
pipeline.apply("CreateMainInput", Create.of(29))
.apply("OutputSideInputs",
- ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
Iterator<Integer> iterator = c.sideInput(view).iterator();
while (iterator.hasNext()) {
@@ -453,7 +455,7 @@ public class ViewTest implements Serializable {
}
}));
- // Pass at least one value through to guarantee that OldDoFn executes.
+ // Pass at least one value through to guarantee that DoFn executes.
PAssert.that(output).containsInAnyOrder(11);
pipeline.run();
@@ -472,11 +474,11 @@ public class ViewTest implements Serializable {
pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
.apply(
"OutputSideInputs",
- ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
for (Integer v : c.sideInput(view).get(c.element().substring(0, 1))) {
- c.output(KV.of(c.element(), v));
+ c.output(of(c.element(), v));
}
}
}));
@@ -500,8 +502,8 @@ public class ViewTest implements Serializable {
pipeline.apply("CreateMainInput", Create.of(2 /* size */))
.apply(
"OutputSideInputs",
- ParDo.withSideInputs(view).of(new OldDoFn<Integer, KV<String, Integer>>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<Integer, KV<String, Integer>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
assertEquals((int) c.element(), c.sideInput(view).size());
assertEquals((int) c.element(), c.sideInput(view).entrySet().size());
@@ -554,11 +556,11 @@ public class ViewTest implements Serializable {
pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
.apply(
"OutputSideInputs",
- ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
for (Integer v : c.sideInput(view).get(c.element().substring(0, 1))) {
- c.output(KV.of(c.element(), v));
+ c.output(of(c.element(), v));
}
}
}));
@@ -591,13 +593,13 @@ public class ViewTest implements Serializable {
TimestampedValue.of("blackberry", new Instant(16))))
.apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10))))
.apply("OutputSideInputs", ParDo.withSideInputs(view).of(
- new OldDoFn<String, KV<String, Integer>>() {
- @Override
+ new DoFn<String, KV<String, Integer>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
for (Integer v :
c.sideInput(view)
.get(c.element().substring(0, 1))) {
- c.output(KV.of(c.element(), v));
+ c.output(of(c.element(), v));
}
}
}));
@@ -629,8 +631,8 @@ public class ViewTest implements Serializable {
TimestampedValue.of(1 /* size */, new Instant(16))))
.apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
.apply("OutputSideInputs", ParDo.withSideInputs(view).of(
- new OldDoFn<Integer, KV<String, Integer>>() {
- @Override
+ new DoFn<Integer, KV<String, Integer>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
assertEquals((int) c.element(),
c.sideInput(view).size());
@@ -674,13 +676,13 @@ public class ViewTest implements Serializable {
TimestampedValue.of("blackberry", new Instant(16))))
.apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10))))
.apply("OutputSideInputs", ParDo.withSideInputs(view).of(
- new OldDoFn<String, KV<String, Integer>>() {
- @Override
+ new DoFn<String, KV<String, Integer>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
for (Integer v :
c.sideInput(view)
.get(c.element().substring(0, 1))) {
- c.output(KV.of(c.element(), v));
+ c.output(of(c.element(), v));
}
}
}));
@@ -704,8 +706,8 @@ public class ViewTest implements Serializable {
PCollection<Integer> results =
pipeline.apply("Create1", Create.of(1))
.apply("OutputSideInputs",
- ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
assertTrue(c.sideInput(view).isEmpty());
assertTrue(c.sideInput(view).entrySet().isEmpty());
@@ -714,7 +716,7 @@ public class ViewTest implements Serializable {
}
}));
- // Pass at least one value through to guarantee that OldDoFn executes.
+ // Pass at least one value through to guarantee that DoFn executes.
PAssert.that(results).containsInAnyOrder(1);
pipeline.run();
@@ -734,8 +736,8 @@ public class ViewTest implements Serializable {
PCollection<Integer> results =
pipeline.apply("Create1", Create.of(1))
.apply("OutputSideInputs",
- ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
assertTrue(c.sideInput(view).isEmpty());
assertTrue(c.sideInput(view).entrySet().isEmpty());
@@ -744,7 +746,7 @@ public class ViewTest implements Serializable {
}
}));
- // Pass at least one value through to guarantee that OldDoFn executes.
+ // Pass at least one value through to guarantee that DoFn executes.
PAssert.that(results).containsInAnyOrder(1);
pipeline.run();
@@ -763,8 +765,8 @@ public class ViewTest implements Serializable {
pipeline.apply("CreateMainInput", Create.of("apple"))
.apply(
"OutputSideInputs",
- ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
try {
c.sideInput(view).clear();
@@ -792,7 +794,7 @@ public class ViewTest implements Serializable {
}
}));
- // Pass at least one value through to guarantee that OldDoFn executes.
+ // Pass at least one value through to guarantee that DoFn executes.
PAssert.that(output).containsInAnyOrder(KV.of("apple", 1));
pipeline.run();
@@ -811,11 +813,11 @@ public class ViewTest implements Serializable {
pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
.apply(
"OutputSideInputs",
- ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(
- KV.of(c.element(), c.sideInput(view).get(c.element().substring(0, 1))));
+ of(c.element(), c.sideInput(view).get(c.element().substring(0, 1))));
}
}));
@@ -838,8 +840,8 @@ public class ViewTest implements Serializable {
pipeline.apply("CreateMainInput", Create.of(2 /* size */))
.apply(
"OutputSideInputs",
- ParDo.withSideInputs(view).of(new OldDoFn<Integer, KV<String, Integer>>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<Integer, KV<String, Integer>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
assertEquals((int) c.element(), c.sideInput(view).size());
assertEquals((int) c.element(), c.sideInput(view).entrySet().size());
@@ -870,11 +872,11 @@ public class ViewTest implements Serializable {
pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
.apply(
"OutputSideInputs",
- ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(
- KV.of(c.element(), c.sideInput(view).get(c.element().substring(0, 1))));
+ of(c.element(), c.sideInput(view).get(c.element().substring(0, 1))));
}
}));
@@ -906,8 +908,8 @@ public class ViewTest implements Serializable {
TimestampedValue.of("blackberry", new Instant(16))))
.apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10))))
.apply("OutputSideInputs", ParDo.withSideInputs(view).of(
- new OldDoFn<String, KV<String, Integer>>() {
- @Override
+ new DoFn<String, KV<String, Integer>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(
c.element(),
@@ -943,8 +945,8 @@ public class ViewTest implements Serializable {
TimestampedValue.of(1 /* size */, new Instant(16))))
.apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
.apply("OutputSideInputs", ParDo.withSideInputs(view).of(
- new OldDoFn<Integer, KV<String, Integer>>() {
- @Override
+ new DoFn<Integer, KV<String, Integer>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
assertEquals((int) c.element(),
c.sideInput(view).size());
@@ -988,10 +990,10 @@ public class ViewTest implements Serializable {
TimestampedValue.of("blackberry", new Instant(16))))
.apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10))))
.apply("OutputSideInputs", ParDo.withSideInputs(view).of(
- new OldDoFn<String, KV<String, Integer>>() {
- @Override
+ new DoFn<String, KV<String, Integer>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
- c.output(KV.of(
+ c.output(of(
c.element(),
c.sideInput(view).get(
c.element().substring(0, 1))));
@@ -1017,8 +1019,8 @@ public class ViewTest implements Serializable {
PCollection<Integer> results =
pipeline.apply("Create1", Create.of(1))
.apply("OutputSideInputs",
- ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
assertTrue(c.sideInput(view).isEmpty());
assertTrue(c.sideInput(view).entrySet().isEmpty());
@@ -1027,7 +1029,7 @@ public class ViewTest implements Serializable {
}
}));
- // Pass at least one value through to guarantee that OldDoFn executes.
+ // Pass at least one value through to guarantee that DoFn executes.
PAssert.that(results).containsInAnyOrder(1);
pipeline.run();
@@ -1046,8 +1048,8 @@ public class ViewTest implements Serializable {
PCollection<Integer> results =
pipeline.apply("Create1", Create.of(1))
.apply("OutputSideInputs",
- ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
assertTrue(c.sideInput(view).isEmpty());
assertTrue(c.sideInput(view).entrySet().isEmpty());
@@ -1056,7 +1058,7 @@ public class ViewTest implements Serializable {
}
}));
- // Pass at least one value through to guarantee that OldDoFn executes.
+ // Pass at least one value through to guarantee that DoFn executes.
PAssert.that(results).containsInAnyOrder(1);
pipeline.run();
@@ -1080,8 +1082,8 @@ public class ViewTest implements Serializable {
pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
.apply(
"OutputSideInputs",
- ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(
KV.of(c.element(), c.sideInput(view).get(c.element().substring(0, 1))));
@@ -1111,8 +1113,8 @@ public class ViewTest implements Serializable {
pipeline.apply("CreateMainInput", Create.of("apple"))
.apply(
"OutputSideInputs",
- ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
try {
c.sideInput(view).clear();
@@ -1139,7 +1141,7 @@ public class ViewTest implements Serializable {
}
}));
- // Pass at least one value through to guarantee that OldDoFn executes.
+ // Pass at least one value through to guarantee that DoFn executes.
PAssert.that(output).containsInAnyOrder(KV.of("apple", 1));
pipeline.run();
@@ -1158,8 +1160,8 @@ public class ViewTest implements Serializable {
PCollection<KV<String, Integer>> output =
pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
.apply("Output",
- ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(KV
.of(c.element(), c.sideInput(view).get(c.element().substring(0, 1))));
@@ -1193,8 +1195,8 @@ public class ViewTest implements Serializable {
TimestampedValue.of("C", new Instant(7))))
.apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10))))
.apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of(
- new OldDoFn<String, String>() {
- @Override
+ new DoFn<String, String>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element() + c.sideInput(view));
}
@@ -1226,8 +1228,8 @@ public class ViewTest implements Serializable {
TimestampedValue.of("C", new Instant(7))))
.apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10))))
.apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of(
- new OldDoFn<String, String>() {
- @Override
+ new DoFn<String, String>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element() + c.sideInput(view));
}
@@ -1257,8 +1259,8 @@ public class ViewTest implements Serializable {
TimestampedValue.of("C", new Instant(7))))
.apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10))))
.apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of(
- new OldDoFn<String, String>() {
- @Override
+ new DoFn<String, String>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element() + c.sideInput(view));
}
@@ -1287,8 +1289,8 @@ public class ViewTest implements Serializable {
p.apply("CreateMainInput", Create.of(""))
.apply(
"OutputMainAndSideInputs",
- ParDo.withSideInputs(view).of(new OldDoFn<String, String>() {
- @Override
+ ParDo.withSideInputs(view).of(new DoFn<String, String>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element() + c.sideInput(view));
}
@@ -1305,8 +1307,8 @@ public class ViewTest implements Serializable {
Pipeline pipeline = TestPipeline.create();
final PCollectionView<Iterable<Integer>> view1 =
pipeline.apply("CreateVoid1", Create.of((Void) null).withCoder(VoidCoder.of()))
- .apply("OutputOneInteger", ParDo.of(new OldDoFn<Void, Integer>() {
- @Override
+ .apply("OutputOneInteger", ParDo.of(new DoFn<Void, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(17);
}
@@ -1317,8 +1319,8 @@ public class ViewTest implements Serializable {
pipeline.apply("CreateVoid2", Create.of((Void) null).withCoder(VoidCoder.of()))
.apply(
"OutputSideInput",
- ParDo.withSideInputs(view1).of(new OldDoFn<Void, Iterable<Integer>>() {
- @Override
+ ParDo.withSideInputs(view1).of(new DoFn<Void, Iterable<Integer>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.sideInput(view1));
}
@@ -1328,8 +1330,8 @@ public class ViewTest implements Serializable {
PCollection<Integer> output =
pipeline.apply("CreateVoid3", Create.of((Void) null).withCoder(VoidCoder.of()))
.apply("ReadIterableSideInput",
- ParDo.withSideInputs(view2).of(new OldDoFn<Void, Integer>() {
- @Override
+ ParDo.withSideInputs(view2).of(new DoFn<Void, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
for (Iterable<Integer> input : c.sideInput(view2)) {
for (Integer i : input) {
[12/13] incubator-beam git commit: Port Flink integration tests to
new DoFn
Posted by ke...@apache.org.
Port Flink integration tests to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ae1f6d18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ae1f6d18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ae1f6d18
Branch: refs/heads/master
Commit: ae1f6d181ebe3c0bdffc35c833a6fdc858937d6c
Parents: 879f18f
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 12:17:20 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/runners/flink/ReadSourceITCase.java | 6 +++---
.../apache/beam/runners/flink/ReadSourceStreamingITCase.java | 8 +++++---
2 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ae1f6d18/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
index ca70096..516c7ba 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.flink;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.CountingInput;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
@@ -72,8 +72,8 @@ public class ReadSourceITCase extends JavaProgramTestBase {
PCollection<String> result = p
.apply(CountingInput.upTo(10))
- .apply(ParDo.of(new OldDoFn<Long, String>() {
- @Override
+ .apply(ParDo.of(new DoFn<Long, String>() {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(c.element().toString());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ae1f6d18/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
index bc69f34..ea58d0d 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
@@ -20,9 +20,11 @@ package org.apache.beam.runners.flink;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.CountingInput;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
+
import com.google.common.base.Joiner;
+
import org.apache.flink.streaming.util.StreamingProgramTestBase;
/**
@@ -59,8 +61,8 @@ public class ReadSourceStreamingITCase extends StreamingProgramTestBase {
p
.apply(CountingInput.upTo(10))
- .apply(ParDo.of(new OldDoFn<Long, String>() {
- @Override
+ .apply(ParDo.of(new DoFn<Long, String>() {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(c.element().toString());
}
[13/13] incubator-beam git commit: This closes #797
Posted by ke...@apache.org.
This closes #797
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/574c3777
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/574c3777
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/574c3777
Branch: refs/heads/master
Commit: 574c3777ddb6d9e8f5afb39e7ec14913ea52a8e8
Parents: bb00810 f5df358
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Aug 8 13:40:21 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 13:40:21 2016 -0700
----------------------------------------------------------------------
.../beam/runners/flink/examples/TFIDF.java | 28 +--
.../beam/runners/flink/examples/WordCount.java | 5 +-
.../flink/examples/streaming/AutoComplete.java | 37 ++--
.../flink/examples/streaming/JoinExamples.java | 14 +-
.../examples/streaming/KafkaIOExamples.java | 7 +-
.../KafkaWindowedWordCountExample.java | 10 +-
.../examples/streaming/WindowedWordCount.java | 10 +-
.../beam/runners/flink/ReadSourceITCase.java | 6 +-
.../flink/ReadSourceStreamingITCase.java | 8 +-
.../apache/beam/runners/spark/TfIdfTest.java | 22 +--
.../spark/translation/CombinePerKeyTest.java | 6 +-
.../translation/MultiOutputWordCountTest.java | 10 +-
.../spark/translation/SerializationTest.java | 10 +-
.../streaming/KafkaStreamingTest.java | 6 +-
.../main/java/org/apache/beam/sdk/io/Write.java | 26 +--
.../org/apache/beam/sdk/transforms/Combine.java | 20 +-
.../org/apache/beam/sdk/transforms/Filter.java | 4 +-
.../beam/sdk/transforms/windowing/Window.java | 7 +-
.../org/apache/beam/sdk/util/Reshuffle.java | 6 +-
.../java/org/apache/beam/sdk/io/WriteTest.java | 22 ++-
.../apache/beam/sdk/transforms/DoFnTest.java | 29 +--
.../apache/beam/sdk/transforms/ViewTest.java | 192 ++++++++++---------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 21 +-
.../sdk/transforms/WithTimestampsJava8Test.java | 6 +-
24 files changed, 248 insertions(+), 264 deletions(-)
----------------------------------------------------------------------
[05/13] incubator-beam git commit: Port Reshuffle to new DoFn
Posted by ke...@apache.org.
Port Reshuffle to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ecf21a5c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ecf21a5c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ecf21a5c
Branch: refs/heads/master
Commit: ecf21a5cc177c39e515e4c78e16b579ac298c999
Parents: d798413
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 11:47:23 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700
----------------------------------------------------------------------
.../core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ecf21a5c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
index 66c7cc0..ad33a25 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
@@ -17,8 +17,8 @@
*/
package org.apache.beam.sdk.util;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -70,8 +70,8 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti
// set allowed lateness.
.setWindowingStrategyInternal(originalStrategy)
.apply("ExpandIterable", ParDo.of(
- new OldDoFn<KV<K, Iterable<V>>, KV<K, V>>() {
- @Override
+ new DoFn<KV<K, Iterable<V>>, KV<K, V>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
K key = c.element().getKey();
for (V value : c.element().getValue()) {
[02/13] incubator-beam git commit: Port BigQueryIO to new DoFn
Posted by ke...@apache.org.
Port BigQueryIO to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d6395e9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d6395e9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d6395e9d
Branch: refs/heads/master
Commit: d6395e9d45dcbeb9b3d3e2f8214a49866622b9cf
Parents: 87313f1
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 12:26:53 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 21 ++++++++++----------
1 file changed, 10 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6395e9d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index ed2c32e..36e09f1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -44,7 +44,6 @@ import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -1785,7 +1784,7 @@ public class BigQueryIO {
return PDone.in(input.getPipeline());
}
- private class WriteBundles extends OldDoFn<TableRow, KV<String, Long>> {
+ private class WriteBundles extends DoFn<TableRow, KV<String, Long>> {
private TableRowWriter writer = null;
private final String tempFilePrefix;
@@ -1793,7 +1792,7 @@ public class BigQueryIO {
this.tempFilePrefix = tempFilePrefix;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
if (writer == null) {
writer = new TableRowWriter(tempFilePrefix);
@@ -1806,7 +1805,7 @@ public class BigQueryIO {
// Discard write result and close the write.
try {
writer.close();
- // The writer does not need to be reset, as this OldDoFn cannot be reused.
+ // The writer does not need to be reset, as this DoFn cannot be reused.
} catch (Exception closeException) {
// Do not mask the exception that caused the write to fail.
e.addSuppressed(closeException);
@@ -1815,7 +1814,7 @@ public class BigQueryIO {
}
}
- @Override
+ @FinishBundle
public void finishBundle(Context c) throws Exception {
if (writer != null) {
c.output(writer.close());
@@ -1959,7 +1958,7 @@ public class BigQueryIO {
/**
* Partitions temporary files based on number of files and file sizes.
*/
- static class WritePartition extends OldDoFn<String, KV<Long, List<String>>> {
+ static class WritePartition extends DoFn<String, KV<Long, List<String>>> {
private final PCollectionView<Iterable<KV<String, Long>>> resultsView;
private TupleTag<KV<Long, List<String>>> multiPartitionsTag;
private TupleTag<KV<Long, List<String>>> singlePartitionTag;
@@ -1973,7 +1972,7 @@ public class BigQueryIO {
this.singlePartitionTag = singlePartitionTag;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
List<KV<String, Long>> results = Lists.newArrayList(c.sideInput(resultsView));
if (results.isEmpty()) {
@@ -2015,7 +2014,7 @@ public class BigQueryIO {
/**
* Writes partitions to BigQuery tables.
*/
- static class WriteTables extends OldDoFn<KV<Long, Iterable<List<String>>>, String> {
+ static class WriteTables extends DoFn<KV<Long, Iterable<List<String>>>, String> {
private final boolean singlePartition;
private final BigQueryServices bqServices;
private final String jobIdToken;
@@ -2044,7 +2043,7 @@ public class BigQueryIO {
this.createDisposition = createDisposition;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
List<String> partition = Lists.newArrayList(c.element().getValue()).get(0);
String jobIdPrefix = String.format(jobIdToken + "_%05d", c.element().getKey());
@@ -2149,7 +2148,7 @@ public class BigQueryIO {
/**
* Copies temporary tables to destination table.
*/
- static class WriteRename extends OldDoFn<String, Void> {
+ static class WriteRename extends DoFn<String, Void> {
private final BigQueryServices bqServices;
private final String jobIdToken;
private final String jsonTableRef;
@@ -2172,7 +2171,7 @@ public class BigQueryIO {
this.tempTablesView = tempTablesView;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
List<String> tempTablesJson = Lists.newArrayList(c.sideInput(tempTablesView));
[08/13] incubator-beam git commit: Remove references to OldDoFn from
DoFnTest
Posted by ke...@apache.org.
Remove references to OldDoFn from DoFnTest
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/32f84bb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/32f84bb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/32f84bb4
Branch: refs/heads/master
Commit: 32f84bb4d442a9b2060a4a5fbdc77d14d6b0976b
Parents: 86291de
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 11:54:57 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/DoFnTest.java | 29 +-------------------
1 file changed, 1 insertion(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32f84bb4/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
index c7e8972..710e4ce 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
@@ -22,10 +22,6 @@ import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.testing.NeedsRunner;
@@ -140,29 +136,6 @@ public class DoFnTest implements Serializable {
}
@Test
- public void testDoFnWithContextUsingAggregators() {
- NoOpOldDoFn<Object, Object> noOpFn = new NoOpOldDoFn<>();
- OldDoFn<Object, Object>.Context context = noOpFn.context();
-
- OldDoFn<Object, Object> fn = spy(noOpFn);
- context = spy(context);
-
- @SuppressWarnings("unchecked")
- Aggregator<Long, Long> agg = mock(Aggregator.class);
-
- Sum.SumLongFn combiner = new Sum.SumLongFn();
- Aggregator<Long, Long> delegateAggregator =
- fn.createAggregator("test", combiner);
-
- when(context.createAggregatorInternal("test", combiner)).thenReturn(agg);
-
- context.setupDelegateAggregators();
- delegateAggregator.addValue(1L);
-
- verify(agg).addValue(1L);
- }
-
- @Test
public void testDefaultPopulateDisplayDataImplementation() {
DoFn<String, String> fn = new DoFn<String, String>() {
};
@@ -225,7 +198,7 @@ public class DoFnTest implements Serializable {
}
/**
- * Initialize a test pipeline with the specified {@link OldDoFn}.
+ * Initialize a test pipeline with the specified {@link DoFn}.
*/
private <InputT, OutputT> TestPipeline createTestPipeline(DoFn<InputT, OutputT> fn) {
TestPipeline pipeline = TestPipeline.create();
[03/13] incubator-beam git commit: Port most of Combine to new DoFn
Posted by ke...@apache.org.
Port most of Combine to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/331f5234
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/331f5234
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/331f5234
Branch: refs/heads/master
Commit: 331f523461094af666a20bd97e1e15f1dec3feba
Parents: b1db02d
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 12:11:11 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/Combine.java | 20 ++++++++++----------
1 file changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/331f5234/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 6fc2324..a825800 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -1473,9 +1473,9 @@ public class Combine {
PCollection<OutputT> defaultIfEmpty = maybeEmpty.getPipeline()
.apply("CreateVoid", Create.of((Void) null).withCoder(VoidCoder.of()))
.apply("ProduceDefault", ParDo.withSideInputs(maybeEmptyView).of(
- new OldDoFn<Void, OutputT>() {
- @Override
- public void processElement(OldDoFn<Void, OutputT>.ProcessContext c) {
+ new DoFn<Void, OutputT>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
Iterator<OutputT> combined = c.sideInput(maybeEmptyView).iterator();
if (!combined.hasNext()) {
c.output(defaultValue);
@@ -2097,15 +2097,15 @@ public class Combine {
final TupleTag<KV<KV<K, Integer>, InputT>> hot = new TupleTag<>();
final TupleTag<KV<K, InputT>> cold = new TupleTag<>();
PCollectionTuple split = input.apply("AddNonce", ParDo.of(
- new OldDoFn<KV<K, InputT>, KV<K, InputT>>() {
+ new DoFn<KV<K, InputT>, KV<K, InputT>>() {
transient int counter;
- @Override
+ @StartBundle
public void startBundle(Context c) {
counter = ThreadLocalRandom.current().nextInt(
Integer.MAX_VALUE);
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
KV<K, InputT> kv = c.element();
int spread = Math.max(1, hotKeyFanout.apply(kv.getKey()));
@@ -2135,9 +2135,9 @@ public class Combine {
.setWindowingStrategyInternal(preCombineStrategy)
.apply("PreCombineHot", Combine.perKey(hotPreCombine))
.apply("StripNonce", ParDo.of(
- new OldDoFn<KV<KV<K, Integer>, AccumT>,
+ new DoFn<KV<KV<K, Integer>, AccumT>,
KV<K, InputOrAccum<InputT, AccumT>>>() {
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(
c.element().getKey().getKey(),
@@ -2151,8 +2151,8 @@ public class Combine {
.get(cold)
.setCoder(inputCoder)
.apply("PrepareCold", ParDo.of(
- new OldDoFn<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>() {
- @Override
+ new DoFn<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(c.element().getKey(),
InputOrAccum.<InputT, AccumT>input(c.element().getValue())));