You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/05 03:10:50 UTC
[8/9] incubator-beam git commit: Port easy Java SDK tests to new DoFn
Port easy Java SDK tests to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1959ddbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1959ddbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1959ddbe
Branch: refs/heads/master
Commit: 1959ddbedb2ad61824bf28e1e9139cc677a2aaf5
Parents: f5011e5
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Aug 3 20:15:12 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 14:56:42 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/PipelineTest.java | 8 ++---
.../apache/beam/sdk/coders/AvroCoderTest.java | 6 ++--
.../beam/sdk/coders/CoderRegistryTest.java | 10 +++---
.../beam/sdk/coders/SerializableCoderTest.java | 10 +++---
.../apache/beam/sdk/io/CountingInputTest.java | 6 ++--
.../apache/beam/sdk/io/CountingSourceTest.java | 6 ++--
.../beam/sdk/io/PubsubUnboundedSinkTest.java | 6 ++--
.../sdk/transforms/ApproximateUniqueTest.java | 6 ++--
.../beam/sdk/transforms/CombineFnsTest.java | 4 +--
.../apache/beam/sdk/transforms/CombineTest.java | 18 +++++------
.../apache/beam/sdk/transforms/CreateTest.java | 4 +--
.../apache/beam/sdk/transforms/FlattenTest.java | 8 ++---
.../beam/sdk/transforms/GroupByKeyTest.java | 8 ++---
.../beam/sdk/transforms/WithTimestampsTest.java | 12 +++----
.../display/DisplayDataEvaluatorTest.java | 10 +++---
.../sdk/transforms/display/DisplayDataTest.java | 6 ++--
.../sdk/transforms/join/CoGroupByKeyTest.java | 34 ++++++++++----------
.../sdk/transforms/windowing/WindowTest.java | 10 +++---
.../sdk/transforms/windowing/WindowingTest.java | 23 ++++++-------
.../beam/sdk/values/PCollectionTupleTest.java | 6 ++--
.../apache/beam/sdk/values/TypedPValueTest.java | 10 +++---
21 files changed, 106 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index 5137031..8b86499 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -36,8 +36,8 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.UserCodeException;
@@ -146,9 +146,9 @@ public class PipelineTest {
private static PTransform<PCollection<? extends String>, PCollection<String>> addSuffix(
final String suffix) {
- return ParDo.of(new OldDoFn<String, String>() {
- @Override
- public void processElement(OldDoFn<String, String>.ProcessContext c) {
+ return ParDo.of(new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement(DoFn<String, String>.ProcessContext c) {
c.output(c.element() + suffix);
}
});
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
index 41d0932..3b13e35 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
@@ -31,7 +31,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.CloudObject;
import org.apache.beam.sdk.util.SerializableUtils;
@@ -134,8 +134,8 @@ public class AvroCoderTest {
}
}
- private static class GetTextFn extends OldDoFn<Pojo, String> {
- @Override
+ private static class GetTextFn extends DoFn<Pojo, String> {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().text);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index 35ec6c6..da15405 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.CloudObject;
@@ -366,8 +366,8 @@ public class CoderRegistryTest {
private static class PTransformOutputingMySerializableGeneric
extends PTransform<PCollection<String>, PCollection<KV<String, MySerializableGeneric<String>>>> {
- private class OutputDoFn extends OldDoFn<String, KV<String, MySerializableGeneric<String>>> {
- @Override
+ private class OutputDoFn extends DoFn<String, KV<String, MySerializableGeneric<String>>> {
+ @ProcessElement
public void processElement(ProcessContext c) { }
}
@@ -430,8 +430,8 @@ public class CoderRegistryTest {
PCollection<String>,
PCollection<KV<String, MySerializableGeneric<T>>>> {
- private class OutputDoFn extends OldDoFn<String, KV<String, MySerializableGeneric<T>>> {
- @Override
+ private class OutputDoFn extends DoFn<String, KV<String, MySerializableGeneric<T>>> {
+ @ProcessElement
public void processElement(ProcessContext c) { }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
index 3e7fd50..b5465fa 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.CloudObject;
import org.apache.beam.sdk.util.CoderUtils;
@@ -82,15 +82,15 @@ public class SerializableCoderTest implements Serializable {
}
}
- static class StringToRecord extends OldDoFn<String, MyRecord> {
- @Override
+ static class StringToRecord extends DoFn<String, MyRecord> {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(new MyRecord(c.element()));
}
}
- static class RecordToString extends OldDoFn<MyRecord, String> {
- @Override
+ static class RecordToString extends DoFn<MyRecord, String> {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().value);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
index 95f7454..4ec2c9a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
@@ -29,9 +29,9 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
@@ -120,8 +120,8 @@ public class CountingInputTest {
assertThat(endTime.isAfter(startTime.plus(expectedRuntimeMillis)), is(true));
}
- private static class ElementValueDiff extends OldDoFn<Long, Long> {
- @Override
+ private static class ElementValueDiff extends DoFn<Long, Long> {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(c.element() - c.timestamp().getMillis());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
index 45f636f..0bd91c1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
@@ -34,10 +34,10 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -159,8 +159,8 @@ public class CountingSourceTest {
p.run();
}
- private static class ElementValueDiff extends OldDoFn<Long, Long> {
- @Override
+ private static class ElementValueDiff extends DoFn<Long, Long> {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(c.element() - c.timestamp().getMillis());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
index f8592c9..db03a5c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.PubsubClient;
import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
@@ -58,8 +58,8 @@ public class PubsubUnboundedSinkTest {
private static final String ID_LABEL = "id";
private static final int NUM_SHARDS = 10;
- private static class Stamp extends OldDoFn<String, String> {
- @Override
+ private static class Stamp extends DoFn<String, String> {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.outputWithTimestamp(c.element(), new Instant(TIMESTAMP));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
index 5c8732f..7b6d671 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
@@ -54,7 +54,7 @@ import java.util.List;
*/
@RunWith(JUnit4.class)
public class ApproximateUniqueTest implements Serializable {
- // implements Serializable just to make it easy to use anonymous inner OldDoFn subclasses
+ // implements Serializable just to make it easy to use anonymous inner DoFn subclasses
@Test
public void testEstimationErrorToSampleSize() {
@@ -223,8 +223,8 @@ public class ApproximateUniqueTest implements Serializable {
.apply(View.<Long>asSingleton());
PCollection<KV<Long, Long>> approximateAndExact = approximate
- .apply(ParDo.of(new OldDoFn<Long, KV<Long, Long>>() {
- @Override
+ .apply(ParDo.of(new DoFn<Long, KV<Long, Long>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(c.element(), c.sideInput(exact)));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index d6bf826..95ba1aa 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -461,7 +461,7 @@ public class CombineFnsTest {
}
private static class ExtractResultDoFn
- extends OldDoFn<KV<String, CoCombineResult>, KV<String, KV<Integer, String>>> {
+ extends DoFn<KV<String, CoCombineResult>, KV<String, KV<Integer, String>>> {
private final TupleTag<Integer> maxIntTag;
private final TupleTag<UserString> concatStringTag;
@@ -471,7 +471,7 @@ public class CombineFnsTest {
this.concatStringTag = concatStringTag;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
UserString userString = c.element().getValue().get(concatStringTag);
KV<Integer, String> value = KV.of(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index cb9928e..6421b3b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -117,7 +117,7 @@ public class CombineTest implements Serializable {
1, 1, 2, 3, 5, 8, 13, 21, 34, 55
};
- @Mock private OldDoFn<?, ?>.ProcessContext processContext;
+ @Mock private DoFn<?, ?>.ProcessContext processContext;
PCollection<KV<String, Integer>> createInput(Pipeline p,
KV<String, Integer>[] table) {
@@ -372,8 +372,8 @@ public class CombineTest implements Serializable {
pipeline.run();
}
- private static class FormatPaneInfo extends OldDoFn<Integer, String> {
- @Override
+ private static class FormatPaneInfo extends DoFn<Integer, String> {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element() + ": " + c.pane().isLast());
}
@@ -560,8 +560,8 @@ public class CombineTest implements Serializable {
pipeline.run();
}
- private static class GetLast extends OldDoFn<Integer, Integer> {
- @Override
+ private static class GetLast extends DoFn<Integer, Integer> {
+ @ProcessElement
public void processElement(ProcessContext c) {
if (c.pane().isLast()) {
c.output(c.element());
@@ -653,8 +653,8 @@ public class CombineTest implements Serializable {
PCollection<Integer> output = pipeline
.apply("CreateVoidMainInput", Create.of((Void) null))
- .apply("OutputSideInput", ParDo.of(new OldDoFn<Void, Integer>() {
- @Override
+ .apply("OutputSideInput", ParDo.of(new DoFn<Void, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.sideInput(view));
}
@@ -1176,8 +1176,8 @@ public class CombineTest implements Serializable {
}
private static <T> PCollection<T> copy(PCollection<T> pc, final int n) {
- return pc.apply(ParDo.of(new OldDoFn<T, T>() {
- @Override
+ return pc.apply(ParDo.of(new DoFn<T, T>() {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
for (int i = 0; i < n; i++) {
c.output(c.element());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index cf65423..9db0136 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -229,8 +229,8 @@ public class CreateTest {
p.run();
}
- private static class PrintTimestamps extends OldDoFn<String, String> {
- @Override
+ private static class PrintTimestamps extends DoFn<String, String> {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element() + ":" + c.timestamp().getMillis());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index b81eedb..604536b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -130,8 +130,8 @@ public class FlattenTest implements Serializable {
PCollection<String> output = p
.apply(Create.of((Void) null).withCoder(VoidCoder.of()))
- .apply(ParDo.withSideInputs(view).of(new OldDoFn<Void, String>() {
- @Override
+ .apply(ParDo.withSideInputs(view).of(new DoFn<Void, String>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
for (String side : c.sideInput(view)) {
c.output(side);
@@ -339,8 +339,8 @@ public class FlattenTest implements Serializable {
/////////////////////////////////////////////////////////////////////////////
- private static class IdentityFn<T> extends OldDoFn<T, T> {
- @Override
+ private static class IdentityFn<T> extends DoFn<T, T> {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 15c3ba8..afe460f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -371,14 +371,14 @@ public class GroupByKeyTest {
pipeline.run();
}
- private static class AssertTimestamp<K, V> extends OldDoFn<KV<K, V>, Void> {
+ private static class AssertTimestamp<K, V> extends DoFn<KV<K, V>, Void> {
private final Instant timestamp;
public AssertTimestamp(Instant timestamp) {
this.timestamp = timestamp;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
assertThat(c.timestamp(), equalTo(timestamp));
}
@@ -506,9 +506,9 @@ public class GroupByKeyTest {
* Creates a KV that wraps the original KV together with a random key.
*/
static class AssignRandomKey
- extends OldDoFn<KV<BadEqualityKey, Long>, KV<Long, KV<BadEqualityKey, Long>>> {
+ extends DoFn<KV<BadEqualityKey, Long>, KV<Long, KV<BadEqualityKey, Long>>> {
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(KV.of(ThreadLocalRandom.current().nextLong(), c.element()));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
index d2ba452..e381470 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
@@ -65,9 +65,9 @@ public class WithTimestampsTest implements Serializable {
.apply(WithTimestamps.of(timestampFn));
PCollection<KV<String, Instant>> timestampedVals =
- timestamped.apply(ParDo.of(new OldDoFn<String, KV<String, Instant>>() {
- @Override
- public void processElement(OldDoFn<String, KV<String, Instant>>.ProcessContext c)
+ timestamped.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() {
+ @ProcessElement
+ public void processElement(DoFn<String, KV<String, Instant>>.ProcessContext c)
throws Exception {
c.output(KV.of(c.element(), c.timestamp()));
}
@@ -150,9 +150,9 @@ public class WithTimestampsTest implements Serializable {
WithTimestamps.of(backInTimeFn).withAllowedTimestampSkew(skew.plus(100L)));
PCollection<KV<String, Instant>> timestampedVals =
- timestampedWithSkew.apply(ParDo.of(new OldDoFn<String, KV<String, Instant>>() {
- @Override
- public void processElement(OldDoFn<String, KV<String, Instant>>.ProcessContext c)
+ timestampedWithSkew.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() {
+ @ProcessElement
+ public void processElement(DoFn<String, KV<String, Instant>>.ProcessContext c)
throws Exception {
c.output(KV.of(c.element(), c.timestamp()));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
index c1848c6..e233114 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
@@ -24,7 +24,7 @@ import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
@@ -50,8 +50,8 @@ public class DisplayDataEvaluatorTest implements Serializable {
new PTransform<PCollection<String>, POutput> () {
@Override
public PCollection<String> apply(PCollection<String> input) {
- return input.apply(ParDo.of(new OldDoFn<String, String>() {
- @Override
+ return input.apply(ParDo.of(new DoFn<String, String>() {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(c.element());
}
@@ -79,8 +79,8 @@ public class DisplayDataEvaluatorTest implements Serializable {
@Test
public void testPrimitiveTransform() {
PTransform<? super PCollection<Integer>, ? super PCollection<Integer>> myTransform = ParDo.of(
- new OldDoFn<Integer, Integer>() {
- @Override
+ new DoFn<Integer, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
index 517f968..e2f38b4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
@@ -41,7 +41,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
@@ -1053,8 +1053,8 @@ public class DisplayDataTest implements Serializable {
private static class IdentityTransform<T> extends PTransform<PCollection<T>, PCollection<T>> {
@Override
public PCollection<T> apply(PCollection<T> input) {
- return input.apply(ParDo.of(new OldDoFn<T, T>() {
- @Override
+ return input.apply(ParDo.of(new DoFn<T, T>() {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(c.element());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
index 97667a3..c6f82ec 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
@@ -29,9 +29,8 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -84,10 +83,11 @@ public class CoGroupByKeyTest implements Serializable {
input = p.apply("Create" + name, Create.timestamped(list, timestamps)
.withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of())));
}
- return input
- .apply("Identity" + name, ParDo.of(new OldDoFn<KV<Integer, String>,
- KV<Integer, String>>() {
- @Override
+ return input.apply(
+ "Identity" + name,
+ ParDo.of(
+ new DoFn<KV<Integer, String>, KV<Integer, String>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element());
}
@@ -313,11 +313,11 @@ public class CoGroupByKeyTest implements Serializable {
}
/**
- * A OldDoFn used in testCoGroupByKeyWithWindowing(), to test processing the
- * results of a CoGroupByKey.
+ * A DoFn used in testCoGroupByKeyWithWindowing(), to test processing the results of a
+ * CoGroupByKey.
*/
- private static class ClickOfPurchaseFn extends
- OldDoFn<KV<Integer, CoGbkResult>, KV<String, String>> implements RequiresWindowAccess {
+ private static class ClickOfPurchaseFn
+ extends DoFn<KV<Integer, CoGbkResult>, KV<String, String>> {
private final TupleTag<String> clicksTag;
private final TupleTag<String> purchasesTag;
@@ -329,9 +329,9 @@ public class CoGroupByKeyTest implements Serializable {
this.purchasesTag = purchasesTag;
}
- @Override
- public void processElement(ProcessContext c) {
- BoundedWindow w = c.window();
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) {
+ BoundedWindow w = window;
KV<Integer, CoGbkResult> e = c.element();
CoGbkResult row = e.getValue();
Iterable<String> clicks = row.getAll(clicksTag);
@@ -347,11 +347,11 @@ public class CoGroupByKeyTest implements Serializable {
/**
- * A OldDoFn used in testCoGroupByKeyHandleResults(), to test processing the
+ * A DoFn used in testCoGroupByKeyHandleResults(), to test processing the
* results of a CoGroupByKey.
*/
private static class CorrelatePurchaseCountForAddressesWithoutNamesFn extends
- OldDoFn<KV<Integer, CoGbkResult>, KV<String, Integer>> {
+ DoFn<KV<Integer, CoGbkResult>, KV<String, Integer>> {
private final TupleTag<String> purchasesTag;
private final TupleTag<String> addressesTag;
@@ -367,7 +367,7 @@ public class CoGroupByKeyTest implements Serializable {
this.namesTag = namesTag;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
KV<Integer, CoGbkResult> e = c.element();
CoGbkResult row = e.getValue();
@@ -401,7 +401,7 @@ public class CoGroupByKeyTest implements Serializable {
}
/**
- * Tests that the consuming OldDoFn
+ * Tests that the consuming DoFn
* (CorrelatePurchaseCountForAddressesWithoutNamesFn) performs as expected.
*/
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 27d2539..c583860 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -36,8 +36,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -199,8 +199,8 @@ public class WindowTest implements Serializable {
.apply(GroupByKey.<Integer, String>create())
.apply(
ParDo.of(
- new OldDoFn<KV<Integer, Iterable<String>>, Void>() {
- @Override
+ new DoFn<KV<Integer, Iterable<String>>, Void>() {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
assertThat(
c.timestamp(),
@@ -231,8 +231,8 @@ public class WindowTest implements Serializable {
.apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10)))
.withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
.apply(GroupByKey.<Integer, String>create())
- .apply(ParDo.of(new OldDoFn<KV<Integer, Iterable<String>>, Void>() {
- @Override
+ .apply(ParDo.of(new DoFn<KV<Integer, Iterable<String>>, Void>() {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
assertThat(c.timestamp(), equalTo(new Instant(10 * 60 * 1000 - 1)));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index 622a277..159e700 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -26,9 +26,8 @@ import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
@@ -58,12 +57,14 @@ public class WindowingTest implements Serializable {
private static class WindowedCount extends PTransform<PCollection<String>, PCollection<String>> {
- private final class FormatCountsDoFn
- extends OldDoFn<KV<String, Long>, String> implements RequiresWindowAccess {
- @Override
- public void processElement(ProcessContext c) {
- c.output(c.element().getKey() + ":" + c.element().getValue()
- + ":" + c.timestamp().getMillis() + ":" + c.window());
+ private final class FormatCountsDoFn extends DoFn<KV<String, Long>, String> {
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) {
+ c.output(
+ c.element().getKey()
+ + ":" + c.element().getValue()
+ + ":" + c.timestamp().getMillis()
+ + ":" + window);
}
}
private WindowFn<? super String, ?> windowFn;
@@ -234,9 +235,9 @@ public class WindowingTest implements Serializable {
p.run();
}
- /** A OldDoFn that tokenizes lines of text into individual words. */
- static class ExtractWordsWithTimestampsFn extends OldDoFn<String, String> {
- @Override
+ /** A DoFn that tokenizes lines of text into individual words. */
+ static class ExtractWordsWithTimestampsFn extends DoFn<String, String> {
+ @ProcessElement
public void processElement(ProcessContext c) {
String[] words = c.element().split("[^a-zA-Z0-9']+");
if (words.length == 2) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
index 547c778..13218b2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -75,8 +75,8 @@ public final class PCollectionTupleTest implements Serializable {
.apply(Create.of(inputs));
PCollectionTuple outputs = mainInput.apply(ParDo
- .of(new OldDoFn<Integer, Integer>() {
- @Override
+ .of(new DoFn<Integer, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.sideOutput(sideOutputTag, c.element());
}})
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
index c525cf1..287223f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.junit.Rule;
@@ -44,9 +44,9 @@ public class TypedPValueTest {
@Rule
public ExpectedException thrown = ExpectedException.none();
- private static class IdentityDoFn extends OldDoFn<Integer, Integer> {
+ private static class IdentityDoFn extends DoFn<Integer, Integer> {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(c.element());
}
@@ -129,9 +129,9 @@ public class TypedPValueTest {
static class EmptyClass {
}
- private static class EmptyClassDoFn extends OldDoFn<Integer, EmptyClass> {
+ private static class EmptyClassDoFn extends DoFn<Integer, EmptyClass> {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(new EmptyClass());
}