You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/05 03:10:43 UTC
[1/9] incubator-beam git commit: Port easy transforms to new DoFn
Repository: incubator-beam
Updated Branches:
refs/heads/master fcf6b1d34 -> 8daf518bc
Port easy transforms to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/47341e11
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/47341e11
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/47341e11
Branch: refs/heads/master
Commit: 47341e113334827101ddbf775c69ae34d178cd8f
Parents: 269fbf3
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Aug 3 20:27:28 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 14:56:42 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/transforms/Count.java | 4 ++--
.../java/org/apache/beam/sdk/transforms/Create.java | 4 ++--
.../apache/beam/sdk/transforms/FlatMapElements.java | 4 ++--
.../org/apache/beam/sdk/transforms/Flatten.java | 4 ++--
.../java/org/apache/beam/sdk/transforms/Keys.java | 4 ++--
.../java/org/apache/beam/sdk/transforms/KvSwap.java | 4 ++--
.../org/apache/beam/sdk/transforms/MapElements.java | 4 ++--
.../org/apache/beam/sdk/transforms/Partition.java | 4 ++--
.../beam/sdk/transforms/RemoveDuplicates.java | 4 ++--
.../java/org/apache/beam/sdk/transforms/Sample.java | 6 +++---
.../java/org/apache/beam/sdk/transforms/Values.java | 4 ++--
.../java/org/apache/beam/sdk/transforms/View.java | 8 ++++----
.../org/apache/beam/sdk/transforms/WithKeys.java | 4 ++--
.../apache/beam/sdk/transforms/WithTimestamps.java | 6 +++---
.../beam/sdk/transforms/join/CoGroupByKey.java | 16 ++++++++--------
15 files changed, 40 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index 7601ffc..ac59c76 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -107,8 +107,8 @@ public class Count {
public PCollection<KV<T, Long>> apply(PCollection<T> input) {
return
input
- .apply("Init", ParDo.of(new OldDoFn<T, KV<T, Void>>() {
- @Override
+ .apply("Init", ParDo.of(new DoFn<T, KV<T, Void>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(c.element(), (Void) null));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index fb7f784..08d0a7a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -486,8 +486,8 @@ public class Create<T> {
this.elementCoder = elementCoder;
}
- private static class ConvertTimestamps<T> extends OldDoFn<TimestampedValue<T>, T> {
- @Override
+ private static class ConvertTimestamps<T> extends DoFn<TimestampedValue<T>, T> {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index b48da38..694592e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -133,9 +133,9 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
@Override
public PCollection<OutputT> apply(PCollection<InputT> input) {
- return input.apply("Map", ParDo.of(new OldDoFn<InputT, OutputT>() {
+ return input.apply("Map", ParDo.of(new DoFn<InputT, OutputT>() {
private static final long serialVersionUID = 0L;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
for (OutputT element : fn.apply(c.element())) {
c.output(element);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
index 53e898e..7e09d7e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
@@ -174,8 +174,8 @@ public class Flatten {
Coder<T> elemCoder = ((IterableLikeCoder<T, ?>) inCoder).getElemCoder();
return in.apply("FlattenIterables", ParDo.of(
- new OldDoFn<Iterable<T>, T>() {
- @Override
+ new DoFn<Iterable<T>, T>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
for (T i : c.element()) {
c.output(i);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
index c8cbce8..5ac1866 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
@@ -58,8 +58,8 @@ public class Keys<K> extends PTransform<PCollection<? extends KV<K, ?>>,
@Override
public PCollection<K> apply(PCollection<? extends KV<K, ?>> in) {
return
- in.apply("Keys", ParDo.of(new OldDoFn<KV<K, ?>, K>() {
- @Override
+ in.apply("Keys", ParDo.of(new DoFn<KV<K, ?>, K>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().getKey());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
index 430d37b..d4386d2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
@@ -62,8 +62,8 @@ public class KvSwap<K, V> extends PTransform<PCollection<KV<K, V>>,
@Override
public PCollection<KV<V, K>> apply(PCollection<KV<K, V>> in) {
return
- in.apply("KvSwap", ParDo.of(new OldDoFn<KV<K, V>, KV<V, K>>() {
- @Override
+ in.apply("KvSwap", ParDo.of(new DoFn<KV<K, V>, KV<V, K>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
KV<K, V> e = c.element();
c.output(KV.of(e.getValue(), e.getKey()));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index c83c39f..b7b9a5f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -104,8 +104,8 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
@Override
public PCollection<OutputT> apply(PCollection<InputT> input) {
- return input.apply("Map", ParDo.of(new OldDoFn<InputT, OutputT>() {
- @Override
+ return input.apply("Map", ParDo.of(new DoFn<InputT, OutputT>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(fn.apply(c.element()));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
index 2ddcc29..05c9470 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
@@ -134,7 +134,7 @@ public class Partition<T> extends PTransform<PCollection<T>, PCollectionList<T>>
this.partitionDoFn = partitionDoFn;
}
- private static class PartitionDoFn<X> extends OldDoFn<X, Void> {
+ private static class PartitionDoFn<X> extends DoFn<X, Void> {
private final int numPartitions;
private final PartitionFn<? super X> partitionFn;
private final TupleTagList outputTags;
@@ -163,7 +163,7 @@ public class Partition<T> extends PTransform<PCollection<T>, PCollectionList<T>>
return outputTags;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
X input = c.element();
int partition = partitionFn.partitionFor(input, numPartitions);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
index d82c457..bba4b51 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
@@ -85,8 +85,8 @@ public class RemoveDuplicates<T> extends PTransform<PCollection<T>,
@Override
public PCollection<T> apply(PCollection<T> in) {
return in
- .apply("CreateIndex", ParDo.of(new OldDoFn<T, KV<T, Void>>() {
- @Override
+ .apply("CreateIndex", ParDo.of(new DoFn<T, KV<T, Void>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(c.element(), (Void) null));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
index 724b252..12ff2b9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
@@ -164,9 +164,9 @@ public class Sample {
}
/**
- * A {@link OldDoFn} that returns up to limit elements from the side input PCollection.
+ * A {@link DoFn} that returns up to limit elements from the side input PCollection.
*/
- private static class SampleAnyDoFn<T> extends OldDoFn<Void, T> {
+ private static class SampleAnyDoFn<T> extends DoFn<Void, T> {
long limit;
final PCollectionView<Iterable<T>> iterableView;
@@ -175,7 +175,7 @@ public class Sample {
this.iterableView = iterableView;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
for (T i : c.sideInput(iterableView)) {
if (limit-- <= 0) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
index 856e32a..34342db 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
@@ -58,8 +58,8 @@ public class Values<V> extends PTransform<PCollection<? extends KV<?, V>>,
@Override
public PCollection<V> apply(PCollection<? extends KV<?, V>> in) {
return
- in.apply("Values", ParDo.of(new OldDoFn<KV<?, V>, V>() {
- @Override
+ in.apply("Values", ParDo.of(new DoFn<KV<?, V>, V>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().getValue());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 8a61637..7a97c13 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -38,7 +38,7 @@ import java.util.Map;
*
* <p>When a {@link ParDo} tranform is processing a main input
* element in a window {@code w} and a {@link PCollectionView} is read via
- * {@link OldDoFn.ProcessContext#sideInput}, the value of the view for {@code w} is
+ * {@link DoFn.ProcessContext#sideInput}, the value of the view for {@code w} is
* returned.
*
* <p>The SDK supports viewing a {@link PCollection}, per window, as a single value,
@@ -118,7 +118,7 @@ import java.util.Map;
*
* PCollection PageVisits = urlVisits
* .apply(ParDo.withSideInputs(urlToPage)
- * .of(new OldDoFn<UrlVisit, PageVisit>() {
+ * .of(new DoFn<UrlVisit, PageVisit>() {
* {@literal @}Override
* void processElement(ProcessContext context) {
* UrlVisit urlVisit = context.element();
@@ -154,11 +154,11 @@ public class View {
*
* <p>If the input {@link PCollection} is empty,
* throws {@link java.util.NoSuchElementException} in the consuming
- * {@link OldDoFn}.
+ * {@link DoFn}.
*
* <p>If the input {@link PCollection} contains more than one
* element, throws {@link IllegalArgumentException} in the
- * consuming {@link OldDoFn}.
+ * consuming {@link DoFn}.
*/
public static <T> AsSingleton<T> asSingleton() {
return new AsSingleton<>();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
index 37d45aa..2a44963 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
@@ -113,8 +113,8 @@ public class WithKeys<K, V> extends PTransform<PCollection<V>,
@Override
public PCollection<KV<K, V>> apply(PCollection<V> in) {
PCollection<KV<K, V>> result =
- in.apply("AddKeys", ParDo.of(new OldDoFn<V, KV<K, V>>() {
- @Override
+ in.apply("AddKeys", ParDo.of(new DoFn<V, KV<K, V>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(fn.apply(c.element()),
c.element()));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
index 41b549b..7b395f5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
@@ -92,7 +92,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
* Returns the allowed timestamp skew duration, which is the maximum
* duration that timestamps can be shifted backwards from the timestamp of the input element.
*
- * @see OldDoFn#getAllowedTimestampSkew()
+ * @see DoFn#getAllowedTimestampSkew()
*/
public Duration getAllowedTimestampSkew() {
return allowedTimestampSkew;
@@ -105,7 +105,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
.setTypeDescriptorInternal(input.getTypeDescriptor());
}
- private static class AddTimestampsDoFn<T> extends OldDoFn<T, T> {
+ private static class AddTimestampsDoFn<T> extends DoFn<T, T> {
private final SerializableFunction<T, Instant> fn;
private final Duration allowedTimestampSkew;
@@ -114,7 +114,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
this.allowedTimestampSkew = allowedTimestampSkew;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
Instant timestamp = fn.apply(c.element());
checkNotNull(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
index 1bd9f4a..cb06f95 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
@@ -19,9 +19,9 @@ package org.apache.beam.sdk.transforms.join;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder;
@@ -57,7 +57,7 @@ import java.util.List;
*
* PCollection<T> finalResultCollection =
* coGbkResultCollection.apply(ParDo.of(
- * new OldDoFn<KV<K, CoGbkResult>, T>() {
+ * new DoFn<KV<K, CoGbkResult>, T>() {
* @Override
* public void processElement(ProcessContext c) {
* KV<K, CoGbkResult> e = c.element();
@@ -167,12 +167,12 @@ public class CoGroupByKey<K> extends
}
/**
- * A OldDoFn to construct a UnionTable (i.e., a
+ * A DoFn to construct a UnionTable (i.e., a
* {@code PCollection<KV<K, RawUnionValue>>} from a
* {@code PCollection<KV<K, V>>}.
*/
private static class ConstructUnionTableFn<K, V> extends
- OldDoFn<KV<K, V>, KV<K, RawUnionValue>> {
+ DoFn<KV<K, V>, KV<K, RawUnionValue>> {
private final int index;
@@ -180,7 +180,7 @@ public class CoGroupByKey<K> extends
this.index = index;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
KV<K, ?> e = c.element();
c.output(KV.of(e.getKey(), new RawUnionValue(index, e.getValue())));
@@ -188,11 +188,11 @@ public class CoGroupByKey<K> extends
}
/**
- * A OldDoFn to construct a CoGbkResult from an input grouped union
+ * A DoFn to construct a CoGbkResult from an input grouped union
* table.
*/
private static class ConstructCoGbkResultFn<K>
- extends OldDoFn<KV<K, Iterable<RawUnionValue>>,
+ extends DoFn<KV<K, Iterable<RawUnionValue>>,
KV<K, CoGbkResult>> {
private final CoGbkResultSchema schema;
@@ -201,7 +201,7 @@ public class CoGroupByKey<K> extends
this.schema = schema;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
KV<K, Iterable<RawUnionValue>> e = c.element();
c.output(KV.of(e.getKey(), new CoGbkResult(schema, e.getValue())));
[4/9] incubator-beam git commit: Correctly determine if DoFn has an
anonymous class in ParDo
Posted by ke...@apache.org.
Correctly determine if DoFn has an anonymous class in ParDo
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0b186529
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0b186529
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0b186529
Branch: refs/heads/master
Commit: 0b1865295cb89d88878d0a021df103ed45240924
Parents: fcf6b1d
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Aug 4 14:54:56 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 14:56:42 2016 -0700
----------------------------------------------------------------------
.../core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0b186529/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index bb1af9c..91f6203 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -958,7 +958,7 @@ public class ParDo {
@Override
protected String getKindString() {
Class<?> clazz = DoFnReflector.getDoFnClass(fn);
- if (fn.getClass().isAnonymousClass()) {
+ if (clazz.isAnonymousClass()) {
return "AnonymousParMultiDo";
} else {
return String.format("ParMultiDo(%s)", StringUtils.approximateSimpleName(clazz));
[6/9] incubator-beam git commit: Port PAssert to new DoFn
Posted by ke...@apache.org.
Port PAssert to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ef5e31f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ef5e31f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ef5e31f8
Branch: refs/heads/master
Commit: ef5e31f8b79dcedf8feb4bba0e313bfcf330ab1e
Parents: 1959ddb
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Aug 3 20:15:58 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 14:56:42 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/testing/PAssert.java | 39 ++++++++++----------
1 file changed, 19 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef5e31f8/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index 80340c2..e07ee3d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -33,11 +33,10 @@ import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -762,8 +761,8 @@ public class PAssert {
.apply("RewindowActuals", rewindowActuals.<T>windowActuals())
.apply(
ParDo.of(
- new OldDoFn<T, T>() {
- @Override
+ new DoFn<T, T>() {
+ @ProcessElement
public void processElement(ProcessContext context) throws CoderException {
context.output(CoderUtils.clone(coder, context.element()));
}
@@ -884,8 +883,8 @@ public class PAssert {
}
}
- private static final class ConcatFn<T> extends OldDoFn<Iterable<Iterable<T>>, Iterable<T>> {
- @Override
+ private static final class ConcatFn<T> extends DoFn<Iterable<Iterable<T>>, Iterable<T>> {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(Iterables.concat(c.element()));
}
@@ -995,13 +994,13 @@ public class PAssert {
}
/**
- * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of a
+ * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of a
* {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing.
*
* <p>The input is ignored, but is {@link Integer} to be usable on runners that do not support
* null values.
*/
- private static class SideInputCheckerDoFn<ActualT> extends OldDoFn<Integer, Void> {
+ private static class SideInputCheckerDoFn<ActualT> extends DoFn<Integer, Void> {
private final SerializableFunction<ActualT, Void> checkerFn;
private final Aggregator<Integer, Integer> success =
createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
@@ -1015,7 +1014,7 @@ public class PAssert {
this.actual = actual;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
try {
ActualT actualContents = c.sideInput(actual);
@@ -1030,13 +1029,13 @@ public class PAssert {
}
/**
- * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of
+ * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
* the single iterable element of the input {@link PCollection} and adjusts counters and
* thrown exceptions for use in testing.
*
* <p>The singleton property is presumed, not enforced.
*/
- private static class GroupedValuesCheckerDoFn<ActualT> extends OldDoFn<ActualT, Void> {
+ private static class GroupedValuesCheckerDoFn<ActualT> extends DoFn<ActualT, Void> {
private final SerializableFunction<ActualT, Void> checkerFn;
private final Aggregator<Integer, Integer> success =
createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
@@ -1047,7 +1046,7 @@ public class PAssert {
this.checkerFn = checkerFn;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
try {
doChecks(c.element(), checkerFn, success, failure);
@@ -1061,14 +1060,14 @@ public class PAssert {
}
/**
- * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of
+ * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
* the single item contained within the single iterable on input and
* adjusts counters and thrown exceptions for use in testing.
*
* <p>The singleton property of the input {@link PCollection} is presumed, not enforced. However,
* each input element must be a singleton iterable, or this will fail.
*/
- private static class SingletonCheckerDoFn<ActualT> extends OldDoFn<Iterable<ActualT>, Void> {
+ private static class SingletonCheckerDoFn<ActualT> extends DoFn<Iterable<ActualT>, Void> {
private final SerializableFunction<ActualT, Void> checkerFn;
private final Aggregator<Integer, Integer> success =
createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
@@ -1079,7 +1078,7 @@ public class PAssert {
this.checkerFn = checkerFn;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
try {
ActualT actualContents = Iterables.getOnlyElement(c.element());
@@ -1310,7 +1309,7 @@ public class PAssert {
}
/**
- * A OldDoFn that filters elements based on their presence in a static collection of windows.
+ * A DoFn that filters elements based on their presence in a static collection of windows.
*/
private static final class FilterWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
private final StaticWindows windows;
@@ -1324,10 +1323,10 @@ public class PAssert {
return input.apply("FilterWindows", ParDo.of(new Fn()));
}
- private class Fn extends OldDoFn<T, T> implements RequiresWindowAccess {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- if (windows.getWindows().contains(c.window())) {
+ private class Fn extends DoFn<T, T> {
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+ if (windows.getWindows().contains(window)) {
c.output(c.element());
}
}
[5/9] incubator-beam git commit: Port mentions of OldDoFn in
PipelineOptions
Posted by ke...@apache.org.
Port mentions of OldDoFn in PipelineOptions
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f5011e5c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f5011e5c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f5011e5c
Branch: refs/heads/master
Commit: f5011e5c62cb00fb4d8a91bd7d55d5083789a307
Parents: 620bd99
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Aug 3 19:56:33 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 14:56:42 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/options/PipelineOptions.java | 9 ++++-----
1 file changed, 4 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5011e5c/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index 365f668..4595fc8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -22,8 +22,8 @@ import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer;
import org.apache.beam.sdk.options.ProxyInvocationHandler.Deserializer;
import org.apache.beam.sdk.options.ProxyInvocationHandler.Serializer;
import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.Context;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Context;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import com.google.auto.service.AutoService;
@@ -35,7 +35,6 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.lang.reflect.Proxy;
import java.util.ServiceLoader;
-
import javax.annotation.concurrent.ThreadSafe;
/**
@@ -52,7 +51,7 @@ import javax.annotation.concurrent.ThreadSafe;
* and {@link PipelineOptionsFactory#as(Class)}. They can be created
* from command-line arguments with {@link PipelineOptionsFactory#fromArgs(String[])}.
* They can be converted to another type by invoking {@link PipelineOptions#as(Class)} and
- * can be accessed from within a {@link OldDoFn} by invoking
+ * can be accessed from within a {@link DoFn} by invoking
* {@link Context#getPipelineOptions()}.
*
* <p>For example:
@@ -151,7 +150,7 @@ import javax.annotation.concurrent.ThreadSafe;
* {@link PipelineOptionsFactory#withValidation()} is invoked.
*
* <p>{@link JsonIgnore @JsonIgnore} is used to prevent a property from being serialized and
- * available during execution of {@link OldDoFn}. See the Serialization section below for more
+ * available during execution of {@link DoFn}. See the Serialization section below for more
* details.
*
* <h2>Registration Of PipelineOptions</h2>
[3/9] incubator-beam git commit: Port join library to new DoFn
Posted by ke...@apache.org.
Port join library to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/620bd994
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/620bd994
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/620bd994
Branch: refs/heads/master
Commit: 620bd9949a6176ddd1903687fe9b8ba8c5822367
Parents: a1c06d7
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Aug 3 19:55:21 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 14:56:42 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/extensions/joinlibrary/Join.java | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/620bd994/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
index 88836f9..f4e6ccb 100644
--- a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
+++ b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.joinlibrary;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
@@ -59,8 +59,8 @@ public class Join {
.apply(CoGroupByKey.<K>create());
return coGbkResultCollection.apply(ParDo.of(
- new OldDoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
- @Override
+ new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
KV<K, CoGbkResult> e = c.element();
@@ -108,8 +108,8 @@ public class Join {
.apply(CoGroupByKey.<K>create());
return coGbkResultCollection.apply(ParDo.of(
- new OldDoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
- @Override
+ new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
KV<K, CoGbkResult> e = c.element();
@@ -161,8 +161,8 @@ public class Join {
.apply(CoGroupByKey.<K>create());
return coGbkResultCollection.apply(ParDo.of(
- new OldDoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
- @Override
+ new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
KV<K, CoGbkResult> e = c.element();
[9/9] incubator-beam git commit: This closes #782
Posted by ke...@apache.org.
This closes #782
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8daf518b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8daf518b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8daf518b
Branch: refs/heads/master
Commit: 8daf518bccfe425082c7d0b3f31f3623ff67e000
Parents: fcf6b1d 47341e1
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Aug 4 20:10:55 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 20:10:55 2016 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 3 +-
.../java/org/apache/beam/sdk/io/PubsubIO.java | 14 +++----
.../apache/beam/sdk/io/PubsubUnboundedSink.java | 17 ++++-----
.../beam/sdk/io/PubsubUnboundedSource.java | 7 ++--
.../beam/sdk/options/PipelineOptions.java | 9 ++---
.../org/apache/beam/sdk/testing/PAssert.java | 39 ++++++++++----------
.../org/apache/beam/sdk/transforms/Count.java | 4 +-
.../org/apache/beam/sdk/transforms/Create.java | 4 +-
.../beam/sdk/transforms/DoFnReflector.java | 6 +++
.../beam/sdk/transforms/FlatMapElements.java | 4 +-
.../org/apache/beam/sdk/transforms/Flatten.java | 4 +-
.../org/apache/beam/sdk/transforms/Keys.java | 4 +-
.../org/apache/beam/sdk/transforms/KvSwap.java | 4 +-
.../apache/beam/sdk/transforms/MapElements.java | 4 +-
.../org/apache/beam/sdk/transforms/ParDo.java | 2 +-
.../apache/beam/sdk/transforms/Partition.java | 4 +-
.../beam/sdk/transforms/RemoveDuplicates.java | 4 +-
.../org/apache/beam/sdk/transforms/Sample.java | 6 +--
.../org/apache/beam/sdk/transforms/Values.java | 4 +-
.../org/apache/beam/sdk/transforms/View.java | 8 ++--
.../apache/beam/sdk/transforms/WithKeys.java | 4 +-
.../beam/sdk/transforms/WithTimestamps.java | 6 +--
.../beam/sdk/transforms/join/CoGroupByKey.java | 16 ++++----
.../java/org/apache/beam/sdk/PipelineTest.java | 8 ++--
.../apache/beam/sdk/coders/AvroCoderTest.java | 6 +--
.../beam/sdk/coders/CoderRegistryTest.java | 10 ++---
.../beam/sdk/coders/SerializableCoderTest.java | 10 ++---
.../apache/beam/sdk/io/CountingInputTest.java | 6 +--
.../apache/beam/sdk/io/CountingSourceTest.java | 6 +--
.../beam/sdk/io/PubsubUnboundedSinkTest.java | 6 +--
.../sdk/transforms/ApproximateUniqueTest.java | 6 +--
.../beam/sdk/transforms/CombineFnsTest.java | 4 +-
.../apache/beam/sdk/transforms/CombineTest.java | 18 ++++-----
.../apache/beam/sdk/transforms/CreateTest.java | 4 +-
.../apache/beam/sdk/transforms/FlattenTest.java | 8 ++--
.../beam/sdk/transforms/GroupByKeyTest.java | 8 ++--
.../beam/sdk/transforms/WithTimestampsTest.java | 12 +++---
.../display/DisplayDataEvaluatorTest.java | 10 ++---
.../sdk/transforms/display/DisplayDataTest.java | 6 +--
.../sdk/transforms/join/CoGroupByKeyTest.java | 34 ++++++++---------
.../sdk/transforms/windowing/WindowTest.java | 10 ++---
.../sdk/transforms/windowing/WindowingTest.java | 23 ++++++------
.../beam/sdk/values/PCollectionTupleTest.java | 6 +--
.../apache/beam/sdk/values/TypedPValueTest.java | 10 ++---
.../beam/sdk/extensions/joinlibrary/Join.java | 14 +++----
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 36 +++++++++---------
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 12 +++---
.../beam/sdk/io/gcp/datastore/V1Beta3.java | 18 ++++-----
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 10 ++---
.../sdk/io/gcp/bigtable/BigtableWriteIT.java | 6 +--
.../sdk/io/gcp/datastore/V1Beta3TestUtil.java | 9 ++---
.../java/org/apache/beam/sdk/io/jms/JmsIO.java | 10 ++---
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 19 +++++-----
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 10 ++---
54 files changed, 265 insertions(+), 267 deletions(-)
----------------------------------------------------------------------
[2/9] incubator-beam git commit: Port easy I/O transforms to new DoFn
Posted by ke...@apache.org.
Port easy I/O transforms to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/269fbf38
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/269fbf38
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/269fbf38
Branch: refs/heads/master
Commit: 269fbf386454ea77845e54764a125edba7039b03
Parents: ef5e31f
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Aug 3 20:22:26 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 14:56:42 2016 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 3 +-
.../java/org/apache/beam/sdk/io/PubsubIO.java | 14 ++++----
.../apache/beam/sdk/io/PubsubUnboundedSink.java | 17 +++++----
.../beam/sdk/io/PubsubUnboundedSource.java | 7 ++--
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 36 +++++++++-----------
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 12 +++----
.../beam/sdk/io/gcp/datastore/V1Beta3.java | 18 +++++-----
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 10 +++---
.../sdk/io/gcp/bigtable/BigtableWriteIT.java | 6 ++--
.../sdk/io/gcp/datastore/V1Beta3TestUtil.java | 9 +++--
.../java/org/apache/beam/sdk/io/jms/JmsIO.java | 10 +++---
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 19 +++++------
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 10 +++---
13 files changed, 82 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index abcf415..fadd9c7 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -78,6 +78,7 @@ import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.OldDoFn;
@@ -2715,7 +2716,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@Nullable
private PTransform<?, ?> transform;
@Nullable
- private OldDoFn<?, ?> doFn;
+ private DoFn<?, ?> doFn;
/**
* Builds an instance of this class from the overridden transform.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 1902bca..2b27175 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -25,7 +25,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PubsubOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -709,11 +709,11 @@ public class PubsubIO {
*
* <p>Public so can be suppressed by runners.
*/
- public class PubsubBoundedReader extends OldDoFn<Void, T> {
+ public class PubsubBoundedReader extends DoFn<Void, T> {
private static final int DEFAULT_PULL_SIZE = 100;
private static final int ACK_TIMEOUT_SEC = 60;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws IOException {
try (PubsubClient pubsubClient =
FACTORY.newClient(timestampLabel, idLabel,
@@ -998,12 +998,12 @@ public class PubsubIO {
*
* <p>Public so can be suppressed by runners.
*/
- public class PubsubBoundedWriter extends OldDoFn<T, Void> {
+ public class PubsubBoundedWriter extends DoFn<T, Void> {
private static final int MAX_PUBLISH_BATCH_SIZE = 100;
private transient List<OutgoingMessage> output;
private transient PubsubClient pubsubClient;
- @Override
+ @StartBundle
public void startBundle(Context c) throws IOException {
this.output = new ArrayList<>();
// NOTE: idLabel is ignored.
@@ -1012,7 +1012,7 @@ public class PubsubIO {
c.getPipelineOptions().as(PubsubOptions.class));
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws IOException {
// NOTE: The record id is always null.
OutgoingMessage message =
@@ -1025,7 +1025,7 @@ public class PubsubIO {
}
}
- @Override
+ @FinishBundle
public void finishBundle(Context c) throws IOException {
if (!output.isEmpty()) {
publish();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index 9e9536d..3014751 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -31,8 +31,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PubsubOptions;
import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
@@ -65,7 +65,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
-
import javax.annotation.Nullable;
/**
@@ -78,7 +77,7 @@ import javax.annotation.Nullable;
* <li>We try to send messages in batches while also limiting send latency.
* <li>No stats are logged. Rather some counters are used to keep track of elements and batches.
* <li>Though some background threads are used by the underlying netty system all actual Pubsub
- * calls are blocking. We rely on the underlying runner to allow multiple {@link OldDoFn} instances
+ * calls are blocking. We rely on the underlying runner to allow multiple {@link DoFn} instances
* to execute concurrently and hide latency.
* <li>A failed bundle will cause messages to be resent. Thus we rely on the Pubsub consumer
* to dedup messages.
@@ -155,7 +154,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
/**
* Convert elements to messages and shard them.
*/
- private static class ShardFn<T> extends OldDoFn<T, KV<Integer, OutgoingMessage>> {
+ private static class ShardFn<T> extends DoFn<T, KV<Integer, OutgoingMessage>> {
private final Aggregator<Long, Long> elementCounter =
createAggregator("elements", new Sum.SumLongFn());
private final Coder<T> elementCoder;
@@ -168,7 +167,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
this.recordIdMethod = recordIdMethod;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
elementCounter.addValue(1L);
byte[] elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element());
@@ -207,7 +206,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
* Publish messages to Pubsub in batches.
*/
private static class WriterFn
- extends OldDoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> {
+ extends DoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> {
private final PubsubClientFactory pubsubFactory;
private final TopicPath topic;
private final String timestampLabel;
@@ -253,14 +252,14 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
byteCounter.addValue((long) bytes);
}
- @Override
+ @StartBundle
public void startBundle(Context c) throws Exception {
checkState(pubsubClient == null, "startBundle invoked without prior finishBundle");
pubsubClient = pubsubFactory.newClient(timestampLabel, idLabel,
c.getPipelineOptions().as(PubsubOptions.class));
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
List<OutgoingMessage> pubsubMessages = new ArrayList<>(publishBatchSize);
int bytes = 0;
@@ -285,7 +284,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
}
}
- @Override
+ @FinishBundle
public void finishBundle(Context c) throws Exception {
pubsubClient.close();
pubsubClient = null;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index d98bd6a..f99b471 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -31,7 +31,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PubsubOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
@@ -77,7 +77,6 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
-
import javax.annotation.Nullable;
/**
@@ -1107,7 +1106,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
// StatsFn
// ================================================================================
- private static class StatsFn<T> extends OldDoFn<T, T> {
+ private static class StatsFn<T> extends DoFn<T, T> {
private final Aggregator<Long, Long> elementCounter =
createAggregator("elements", new Sum.SumLongFn());
@@ -1131,7 +1130,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
this.idLabel = idLabel;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
elementCounter.addValue(1L);
c.output(c.element());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 2ba7562..ed2c32e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
@@ -103,7 +104,6 @@ import com.google.common.io.CountingOutputStream;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-
import org.apache.avro.generic.GenericRecord;
import org.joda.time.Instant;
import org.slf4j.Logger;
@@ -135,7 +135,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
import javax.annotation.Nullable;
/**
@@ -334,7 +333,7 @@ public class BigQueryIO {
* <p>Each {@link TableRow} contains values indexed by column name. Here is a
* sample processing function that processes a "line" column from rows:
* <pre>{@code
- * static class ExtractWordsFn extends OldDoFn<TableRow, String> {
+ * static class ExtractWordsFn extends DoFn<TableRow, String> {
* public void processElement(ProcessContext c) {
* // Get the "line" field of the TableRow object, split it into words, and emit them.
* TableRow row = c.element();
@@ -706,8 +705,8 @@ public class BigQueryIO {
input.getPipeline()
.apply("Create(CleanupOperation)", Create.of(cleanupOperation))
.apply("Cleanup", ParDo.of(
- new OldDoFn<CleanupOperation, Void>() {
- @Override
+ new DoFn<CleanupOperation, Void>() {
+ @ProcessElement
public void processElement(ProcessContext c)
throws Exception {
c.element().cleanup(c.getPipelineOptions());
@@ -717,8 +716,8 @@ public class BigQueryIO {
return outputs.get(mainOutput);
}
- private static class IdentityFn<T> extends OldDoFn<T, T> {
- @Override
+ private static class IdentityFn<T> extends DoFn<T, T> {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element());
}
@@ -1271,7 +1270,7 @@ public class BigQueryIO {
* <p>Here is a sample transform that produces TableRow values containing
* "word" and "count" columns:
* <pre>{@code
- * static class FormatCountsFn extends OldDoFn<KV<String, Long>, TableRow> {
+ * static class FormatCountsFn extends DoFn<KV<String, Long>, TableRow> {
* public void processElement(ProcessContext c) {
* TableRow row = new TableRow()
* .set("word", c.element().getKey())
@@ -2307,11 +2306,11 @@ public class BigQueryIO {
/////////////////////////////////////////////////////////////////////////////
/**
- * Implementation of OldDoFn to perform streaming BigQuery write.
+ * Implementation of DoFn to perform streaming BigQuery write.
*/
@SystemDoFnInternal
private static class StreamingWriteFn
- extends OldDoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
+ extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
/** TableSchema in JSON. Use String to make the class Serializable. */
private final String jsonTableSchema;
@@ -2339,14 +2338,14 @@ public class BigQueryIO {
}
/** Prepares a target BigQuery table. */
- @Override
+ @StartBundle
public void startBundle(Context context) {
tableRows = new HashMap<>();
uniqueIdsForTableRows = new HashMap<>();
}
/** Accumulates the input into JsonTableRows and uniqueIdsForTableRows. */
- @Override
+ @ProcessElement
public void processElement(ProcessContext context) {
String tableSpec = context.element().getKey().getKey();
List<TableRow> rows = getOrCreateMapListValue(tableRows, tableSpec);
@@ -2357,7 +2356,7 @@ public class BigQueryIO {
}
/** Writes the accumulated rows into BigQuery with streaming API. */
- @Override
+ @FinishBundle
public void finishBundle(Context context) throws Exception {
BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
@@ -2544,8 +2543,7 @@ public class BigQueryIO {
* id is created by concatenating this randomUUID with a sequential number.
*/
private static class TagWithUniqueIdsAndTable
- extends OldDoFn<TableRow, KV<ShardedKey<String>, TableRowInfo>>
- implements OldDoFn.RequiresWindowAccess {
+ extends DoFn<TableRow, KV<ShardedKey<String>, TableRowInfo>> {
/** TableSpec to write to. */
private final String tableSpec;
@@ -2571,18 +2569,18 @@ public class BigQueryIO {
}
- @Override
+ @StartBundle
public void startBundle(Context context) {
randomUUID = UUID.randomUUID().toString();
}
/** Tag the input with a unique id. */
- @Override
- public void processElement(ProcessContext context) throws IOException {
+ @ProcessElement
+ public void processElement(ProcessContext context, BoundedWindow window) throws IOException {
String uniqueId = randomUUID + sequenceNo++;
ThreadLocalRandom randomGenerator = ThreadLocalRandom.current();
String tableSpec = tableSpecFromWindow(
- context.getPipelineOptions().as(BigQueryOptions.class), context.window());
+ context.getPipelineOptions().as(BigQueryOptions.class), window);
// We output on keys 0-50 to ensure that there's enough batching for
// BigQuery.
context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, 50)),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 1f77e3e..bfdf4aa 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -31,7 +31,7 @@ import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -55,7 +55,6 @@ import com.google.common.util.concurrent.Futures;
import com.google.protobuf.ByteString;
import io.grpc.Status;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,7 +64,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentLinkedQueue;
-
import javax.annotation.Nullable;
/**
@@ -512,7 +510,7 @@ public class BigtableIO {
return new BigtableServiceImpl(options);
}
- private class BigtableWriterFn extends OldDoFn<KV<ByteString, Iterable<Mutation>>, Void> {
+ private class BigtableWriterFn extends DoFn<KV<ByteString, Iterable<Mutation>>, Void> {
public BigtableWriterFn(String tableId, BigtableService bigtableService) {
this.tableId = checkNotNull(tableId, "tableId");
@@ -520,13 +518,13 @@ public class BigtableIO {
this.failures = new ConcurrentLinkedQueue<>();
}
- @Override
+ @StartBundle
public void startBundle(Context c) throws Exception {
bigtableWriter = bigtableService.openForWriting(tableId);
recordsWritten = 0;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
checkForFailures();
Futures.addCallback(
@@ -534,7 +532,7 @@ public class BigtableIO {
++recordsWritten;
}
- @Override
+ @FinishBundle
public void finishBundle(Context c) throws Exception {
bigtableWriter.close();
bigtableWriter = null;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
index 6f3663a..052feb3 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
@@ -37,9 +37,9 @@ import org.apache.beam.sdk.io.Sink.Writer;
import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
@@ -478,11 +478,11 @@ public class V1Beta3 {
}
/**
- * A {@link OldDoFn} that splits a given query into multiple sub-queries, assigns them unique
+ * A {@link DoFn} that splits a given query into multiple sub-queries, assigns them unique
* keys and outputs them as {@link KV}.
*/
@VisibleForTesting
- static class SplitQueryFn extends OldDoFn<Query, KV<Integer, Query>> {
+ static class SplitQueryFn extends DoFn<Query, KV<Integer, Query>> {
private final V1Beta3Options options;
// number of splits to make for a given query
private final int numSplits;
@@ -505,13 +505,13 @@ public class V1Beta3 {
this.datastoreFactory = datastoreFactory;
}
- @Override
+ @StartBundle
public void startBundle(Context c) throws Exception {
datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.projectId);
querySplitter = datastoreFactory.getQuerySplitter();
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
int key = 1;
Query query = c.element();
@@ -559,10 +559,10 @@ public class V1Beta3 {
}
/**
- * A {@link OldDoFn} that reads entities from Datastore for each query.
+ * A {@link DoFn} that reads entities from Datastore for each query.
*/
@VisibleForTesting
- static class ReadFn extends OldDoFn<Query, Entity> {
+ static class ReadFn extends DoFn<Query, Entity> {
private final V1Beta3Options options;
private final V1Beta3DatastoreFactory datastoreFactory;
// Datastore client
@@ -578,13 +578,13 @@ public class V1Beta3 {
this.datastoreFactory = datastoreFactory;
}
- @Override
+ @StartBundle
public void startBundle(Context c) throws Exception {
datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId());
}
/** Read and output entities for the given query. */
- @Override
+ @ProcessElement
public void processElement(ProcessContext context) throws Exception {
Query query = context.element();
String namespace = options.getNamespace();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 1ea1f94..6d6eb60 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -22,6 +22,7 @@ import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.toJsonString;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static com.google.common.base.Preconditions.checkArgument;
+
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -64,8 +65,8 @@ import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -131,7 +132,6 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
-
import javax.annotation.Nullable;
/**
@@ -235,7 +235,7 @@ public class BigQueryIOTest implements Serializable {
private Object[] pollJobReturns;
private String executingProject;
// Both counts will be reset back to zeros after serialization.
- // This is a work around for OldDoFn's verifyUnmodified check.
+ // This is a work around for DoFn's verifyUnmodified check.
private transient int startJobCallsCount;
private transient int pollJobStatusCallsCount;
@@ -571,8 +571,8 @@ public class BigQueryIOTest implements Serializable {
.apply(BigQueryIO.Read.from("non-executing-project:somedataset.sometable")
.withTestServices(fakeBqServices)
.withoutValidation())
- .apply(ParDo.of(new OldDoFn<TableRow, String>() {
- @Override
+ .apply(ParDo.of(new DoFn<TableRow, String>() {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output((String) c.element().get("name"));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
index 83489a5..ee3a6f9 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
@@ -23,7 +23,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.CountingInput;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
@@ -108,8 +108,8 @@ public class BigtableWriteIT implements Serializable {
Pipeline p = Pipeline.create(options);
p.apply(CountingInput.upTo(numRows))
- .apply(ParDo.of(new OldDoFn<Long, KV<ByteString, Iterable<Mutation>>>() {
- @Override
+ .apply(ParDo.of(new DoFn<Long, KV<ByteString, Iterable<Mutation>>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
int index = c.element().intValue();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
index daed1cb..7eaf23e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
@@ -27,7 +27,7 @@ import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
@@ -60,7 +60,6 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
-
import javax.annotation.Nullable;
class V1Beta3TestUtil {
@@ -109,9 +108,9 @@ class V1Beta3TestUtil {
}
/**
- * A OldDoFn that creates entity for a long number.
+ * A DoFn that creates entity for a long number.
*/
- static class CreateEntityFn extends OldDoFn<Long, Entity> {
+ static class CreateEntityFn extends DoFn<Long, Entity> {
private final String kind;
@Nullable
private final String namespace;
@@ -124,7 +123,7 @@ class V1Beta3TestUtil {
ancestorKey = makeAncestorKey(namespace, kind, ancestor);
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(makeEntity(c.element(), ancestorKey, kind, namespace));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index eeb02e6..557fe13 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -453,7 +453,7 @@ public class JmsIO {
checkArgument((queue != null || topic != null), "Either queue or topic is required");
}
- private static class JmsWriter extends OldDoFn<String, Void> {
+ private static class JmsWriter extends DoFn<String, Void> {
private ConnectionFactory connectionFactory;
private String queue;
@@ -469,7 +469,7 @@ public class JmsIO {
this.topic = topic;
}
- @Override
+ @StartBundle
public void startBundle(Context c) throws Exception {
if (producer == null) {
this.connection = connectionFactory.createConnection();
@@ -486,7 +486,7 @@ public class JmsIO {
}
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext ctx) throws Exception {
String value = ctx.element();
@@ -499,7 +499,7 @@ public class JmsIO {
}
}
- @Override
+ @FinishBundle
public void finishBundle(Context c) throws Exception {
producer.close();
producer = null;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 2271216..2383105 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -33,7 +33,7 @@ import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -94,7 +94,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-
import javax.annotation.Nullable;
/**
@@ -550,8 +549,8 @@ public class KafkaIO {
return typedRead
.apply(begin)
.apply("Remove Kafka Metadata",
- ParDo.of(new OldDoFn<KafkaRecord<K, V>, KV<K, V>>() {
- @Override
+ ParDo.of(new DoFn<KafkaRecord<K, V>, KV<K, V>>() {
+ @ProcessElement
public void processElement(ProcessContext ctx) {
ctx.output(ctx.element().getKV());
}
@@ -1315,8 +1314,8 @@ public class KafkaIO {
public PDone apply(PCollection<V> input) {
return input
.apply("Kafka values with default key",
- ParDo.of(new OldDoFn<V, KV<Void, V>>() {
- @Override
+ ParDo.of(new DoFn<V, KV<Void, V>>() {
+ @ProcessElement
public void processElement(ProcessContext ctx) throws Exception {
ctx.output(KV.<Void, V>of(null, ctx.element()));
}
@@ -1326,9 +1325,9 @@ public class KafkaIO {
}
}
- private static class KafkaWriter<K, V> extends OldDoFn<KV<K, V>, Void> {
+ private static class KafkaWriter<K, V> extends DoFn<KV<K, V>, Void> {
- @Override
+ @StartBundle
public void startBundle(Context c) throws Exception {
// Producer initialization is fairly costly. Move this to future initialization api to avoid
// creating a producer for each bundle.
@@ -1341,7 +1340,7 @@ public class KafkaIO {
}
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext ctx) throws Exception {
checkForFailures();
@@ -1351,7 +1350,7 @@ public class KafkaIO {
new SendCallback());
}
- @Override
+ @FinishBundle
public void finishBundle(Context c) throws Exception {
producer.flush();
producer.close();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269fbf38/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index d7b1921..9a89c36 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -33,10 +33,10 @@ import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -280,8 +280,8 @@ public class KafkaIOTest {
p.run();
}
- private static class ElementValueDiff extends OldDoFn<Long, Long> {
- @Override
+ private static class ElementValueDiff extends DoFn<Long, Long> {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(c.element() - c.timestamp().getMillis());
}
@@ -308,8 +308,8 @@ public class KafkaIOTest {
p.run();
}
- private static class RemoveKafkaMetadata<K, V> extends OldDoFn<KafkaRecord<K, V>, KV<K, V>> {
- @Override
+ private static class RemoveKafkaMetadata<K, V> extends DoFn<KafkaRecord<K, V>, KV<K, V>> {
+ @ProcessElement
public void processElement(ProcessContext ctx) throws Exception {
ctx.output(ctx.element().getKV());
}
[7/9] incubator-beam git commit: Propagate getAllowedTimestampSkew
from DoFn to its adapter
Posted by ke...@apache.org.
Propagate getAllowedTimestampSkew from DoFn to its adapter
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1c06d71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1c06d71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1c06d71
Branch: refs/heads/master
Commit: a1c06d71876384722982ec24da1607e41af653d9
Parents: 0b18652
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Aug 4 14:56:19 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 14:56:42 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/transforms/DoFnReflector.java | 6 ++++++
1 file changed, 6 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1c06d71/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
index 9bdfde8..c6168b3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
@@ -72,6 +72,7 @@ import net.bytebuddy.jar.asm.Label;
import net.bytebuddy.jar.asm.MethodVisitor;
import net.bytebuddy.jar.asm.Opcodes;
import net.bytebuddy.matcher.ElementMatchers;
+import org.joda.time.Duration;
import org.joda.time.Instant;
import java.io.IOException;
@@ -731,6 +732,11 @@ public abstract class DoFnReflector {
}
@Override
+ public Duration getAllowedTimestampSkew() {
+ return fn.getAllowedTimestampSkew();
+ }
+
+ @Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.include(fn);
}
[8/9] incubator-beam git commit: Port easy Java SDK tests to new DoFn
Posted by ke...@apache.org.
Port easy Java SDK tests to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1959ddbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1959ddbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1959ddbe
Branch: refs/heads/master
Commit: 1959ddbedb2ad61824bf28e1e9139cc677a2aaf5
Parents: f5011e5
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Aug 3 20:15:12 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 14:56:42 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/PipelineTest.java | 8 ++---
.../apache/beam/sdk/coders/AvroCoderTest.java | 6 ++--
.../beam/sdk/coders/CoderRegistryTest.java | 10 +++---
.../beam/sdk/coders/SerializableCoderTest.java | 10 +++---
.../apache/beam/sdk/io/CountingInputTest.java | 6 ++--
.../apache/beam/sdk/io/CountingSourceTest.java | 6 ++--
.../beam/sdk/io/PubsubUnboundedSinkTest.java | 6 ++--
.../sdk/transforms/ApproximateUniqueTest.java | 6 ++--
.../beam/sdk/transforms/CombineFnsTest.java | 4 +--
.../apache/beam/sdk/transforms/CombineTest.java | 18 +++++------
.../apache/beam/sdk/transforms/CreateTest.java | 4 +--
.../apache/beam/sdk/transforms/FlattenTest.java | 8 ++---
.../beam/sdk/transforms/GroupByKeyTest.java | 8 ++---
.../beam/sdk/transforms/WithTimestampsTest.java | 12 +++----
.../display/DisplayDataEvaluatorTest.java | 10 +++---
.../sdk/transforms/display/DisplayDataTest.java | 6 ++--
.../sdk/transforms/join/CoGroupByKeyTest.java | 34 ++++++++++----------
.../sdk/transforms/windowing/WindowTest.java | 10 +++---
.../sdk/transforms/windowing/WindowingTest.java | 23 ++++++-------
.../beam/sdk/values/PCollectionTupleTest.java | 6 ++--
.../apache/beam/sdk/values/TypedPValueTest.java | 10 +++---
21 files changed, 106 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index 5137031..8b86499 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -36,8 +36,8 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.UserCodeException;
@@ -146,9 +146,9 @@ public class PipelineTest {
private static PTransform<PCollection<? extends String>, PCollection<String>> addSuffix(
final String suffix) {
- return ParDo.of(new OldDoFn<String, String>() {
- @Override
- public void processElement(OldDoFn<String, String>.ProcessContext c) {
+ return ParDo.of(new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement(DoFn<String, String>.ProcessContext c) {
c.output(c.element() + suffix);
}
});
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
index 41d0932..3b13e35 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
@@ -31,7 +31,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.CloudObject;
import org.apache.beam.sdk.util.SerializableUtils;
@@ -134,8 +134,8 @@ public class AvroCoderTest {
}
}
- private static class GetTextFn extends OldDoFn<Pojo, String> {
- @Override
+ private static class GetTextFn extends DoFn<Pojo, String> {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().text);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index 35ec6c6..da15405 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.CloudObject;
@@ -366,8 +366,8 @@ public class CoderRegistryTest {
private static class PTransformOutputingMySerializableGeneric
extends PTransform<PCollection<String>, PCollection<KV<String, MySerializableGeneric<String>>>> {
- private class OutputDoFn extends OldDoFn<String, KV<String, MySerializableGeneric<String>>> {
- @Override
+ private class OutputDoFn extends DoFn<String, KV<String, MySerializableGeneric<String>>> {
+ @ProcessElement
public void processElement(ProcessContext c) { }
}
@@ -430,8 +430,8 @@ public class CoderRegistryTest {
PCollection<String>,
PCollection<KV<String, MySerializableGeneric<T>>>> {
- private class OutputDoFn extends OldDoFn<String, KV<String, MySerializableGeneric<T>>> {
- @Override
+ private class OutputDoFn extends DoFn<String, KV<String, MySerializableGeneric<T>>> {
+ @ProcessElement
public void processElement(ProcessContext c) { }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
index 3e7fd50..b5465fa 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.CloudObject;
import org.apache.beam.sdk.util.CoderUtils;
@@ -82,15 +82,15 @@ public class SerializableCoderTest implements Serializable {
}
}
- static class StringToRecord extends OldDoFn<String, MyRecord> {
- @Override
+ static class StringToRecord extends DoFn<String, MyRecord> {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(new MyRecord(c.element()));
}
}
- static class RecordToString extends OldDoFn<MyRecord, String> {
- @Override
+ static class RecordToString extends DoFn<MyRecord, String> {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().value);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
index 95f7454..4ec2c9a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
@@ -29,9 +29,9 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
@@ -120,8 +120,8 @@ public class CountingInputTest {
assertThat(endTime.isAfter(startTime.plus(expectedRuntimeMillis)), is(true));
}
- private static class ElementValueDiff extends OldDoFn<Long, Long> {
- @Override
+ private static class ElementValueDiff extends DoFn<Long, Long> {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(c.element() - c.timestamp().getMillis());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
index 45f636f..0bd91c1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
@@ -34,10 +34,10 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -159,8 +159,8 @@ public class CountingSourceTest {
p.run();
}
- private static class ElementValueDiff extends OldDoFn<Long, Long> {
- @Override
+ private static class ElementValueDiff extends DoFn<Long, Long> {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(c.element() - c.timestamp().getMillis());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
index f8592c9..db03a5c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.PubsubClient;
import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
@@ -58,8 +58,8 @@ public class PubsubUnboundedSinkTest {
private static final String ID_LABEL = "id";
private static final int NUM_SHARDS = 10;
- private static class Stamp extends OldDoFn<String, String> {
- @Override
+ private static class Stamp extends DoFn<String, String> {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.outputWithTimestamp(c.element(), new Instant(TIMESTAMP));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
index 5c8732f..7b6d671 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
@@ -54,7 +54,7 @@ import java.util.List;
*/
@RunWith(JUnit4.class)
public class ApproximateUniqueTest implements Serializable {
- // implements Serializable just to make it easy to use anonymous inner OldDoFn subclasses
+ // implements Serializable just to make it easy to use anonymous inner DoFn subclasses
@Test
public void testEstimationErrorToSampleSize() {
@@ -223,8 +223,8 @@ public class ApproximateUniqueTest implements Serializable {
.apply(View.<Long>asSingleton());
PCollection<KV<Long, Long>> approximateAndExact = approximate
- .apply(ParDo.of(new OldDoFn<Long, KV<Long, Long>>() {
- @Override
+ .apply(ParDo.of(new DoFn<Long, KV<Long, Long>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(c.element(), c.sideInput(exact)));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index d6bf826..95ba1aa 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -461,7 +461,7 @@ public class CombineFnsTest {
}
private static class ExtractResultDoFn
- extends OldDoFn<KV<String, CoCombineResult>, KV<String, KV<Integer, String>>> {
+ extends DoFn<KV<String, CoCombineResult>, KV<String, KV<Integer, String>>> {
private final TupleTag<Integer> maxIntTag;
private final TupleTag<UserString> concatStringTag;
@@ -471,7 +471,7 @@ public class CombineFnsTest {
this.concatStringTag = concatStringTag;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
UserString userString = c.element().getValue().get(concatStringTag);
KV<Integer, String> value = KV.of(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index cb9928e..6421b3b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -117,7 +117,7 @@ public class CombineTest implements Serializable {
1, 1, 2, 3, 5, 8, 13, 21, 34, 55
};
- @Mock private OldDoFn<?, ?>.ProcessContext processContext;
+ @Mock private DoFn<?, ?>.ProcessContext processContext;
PCollection<KV<String, Integer>> createInput(Pipeline p,
KV<String, Integer>[] table) {
@@ -372,8 +372,8 @@ public class CombineTest implements Serializable {
pipeline.run();
}
- private static class FormatPaneInfo extends OldDoFn<Integer, String> {
- @Override
+ private static class FormatPaneInfo extends DoFn<Integer, String> {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element() + ": " + c.pane().isLast());
}
@@ -560,8 +560,8 @@ public class CombineTest implements Serializable {
pipeline.run();
}
- private static class GetLast extends OldDoFn<Integer, Integer> {
- @Override
+ private static class GetLast extends DoFn<Integer, Integer> {
+ @ProcessElement
public void processElement(ProcessContext c) {
if (c.pane().isLast()) {
c.output(c.element());
@@ -653,8 +653,8 @@ public class CombineTest implements Serializable {
PCollection<Integer> output = pipeline
.apply("CreateVoidMainInput", Create.of((Void) null))
- .apply("OutputSideInput", ParDo.of(new OldDoFn<Void, Integer>() {
- @Override
+ .apply("OutputSideInput", ParDo.of(new DoFn<Void, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.sideInput(view));
}
@@ -1176,8 +1176,8 @@ public class CombineTest implements Serializable {
}
private static <T> PCollection<T> copy(PCollection<T> pc, final int n) {
- return pc.apply(ParDo.of(new OldDoFn<T, T>() {
- @Override
+ return pc.apply(ParDo.of(new DoFn<T, T>() {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
for (int i = 0; i < n; i++) {
c.output(c.element());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index cf65423..9db0136 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -229,8 +229,8 @@ public class CreateTest {
p.run();
}
- private static class PrintTimestamps extends OldDoFn<String, String> {
- @Override
+ private static class PrintTimestamps extends DoFn<String, String> {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element() + ":" + c.timestamp().getMillis());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index b81eedb..604536b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -130,8 +130,8 @@ public class FlattenTest implements Serializable {
PCollection<String> output = p
.apply(Create.of((Void) null).withCoder(VoidCoder.of()))
- .apply(ParDo.withSideInputs(view).of(new OldDoFn<Void, String>() {
- @Override
+ .apply(ParDo.withSideInputs(view).of(new DoFn<Void, String>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
for (String side : c.sideInput(view)) {
c.output(side);
@@ -339,8 +339,8 @@ public class FlattenTest implements Serializable {
/////////////////////////////////////////////////////////////////////////////
- private static class IdentityFn<T> extends OldDoFn<T, T> {
- @Override
+ private static class IdentityFn<T> extends DoFn<T, T> {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 15c3ba8..afe460f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -371,14 +371,14 @@ public class GroupByKeyTest {
pipeline.run();
}
- private static class AssertTimestamp<K, V> extends OldDoFn<KV<K, V>, Void> {
+ private static class AssertTimestamp<K, V> extends DoFn<KV<K, V>, Void> {
private final Instant timestamp;
public AssertTimestamp(Instant timestamp) {
this.timestamp = timestamp;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
assertThat(c.timestamp(), equalTo(timestamp));
}
@@ -506,9 +506,9 @@ public class GroupByKeyTest {
* Creates a KV that wraps the original KV together with a random key.
*/
static class AssignRandomKey
- extends OldDoFn<KV<BadEqualityKey, Long>, KV<Long, KV<BadEqualityKey, Long>>> {
+ extends DoFn<KV<BadEqualityKey, Long>, KV<Long, KV<BadEqualityKey, Long>>> {
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(KV.of(ThreadLocalRandom.current().nextLong(), c.element()));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
index d2ba452..e381470 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
@@ -65,9 +65,9 @@ public class WithTimestampsTest implements Serializable {
.apply(WithTimestamps.of(timestampFn));
PCollection<KV<String, Instant>> timestampedVals =
- timestamped.apply(ParDo.of(new OldDoFn<String, KV<String, Instant>>() {
- @Override
- public void processElement(OldDoFn<String, KV<String, Instant>>.ProcessContext c)
+ timestamped.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() {
+ @ProcessElement
+ public void processElement(DoFn<String, KV<String, Instant>>.ProcessContext c)
throws Exception {
c.output(KV.of(c.element(), c.timestamp()));
}
@@ -150,9 +150,9 @@ public class WithTimestampsTest implements Serializable {
WithTimestamps.of(backInTimeFn).withAllowedTimestampSkew(skew.plus(100L)));
PCollection<KV<String, Instant>> timestampedVals =
- timestampedWithSkew.apply(ParDo.of(new OldDoFn<String, KV<String, Instant>>() {
- @Override
- public void processElement(OldDoFn<String, KV<String, Instant>>.ProcessContext c)
+ timestampedWithSkew.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() {
+ @ProcessElement
+ public void processElement(DoFn<String, KV<String, Instant>>.ProcessContext c)
throws Exception {
c.output(KV.of(c.element(), c.timestamp()));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
index c1848c6..e233114 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
@@ -24,7 +24,7 @@ import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
@@ -50,8 +50,8 @@ public class DisplayDataEvaluatorTest implements Serializable {
new PTransform<PCollection<String>, POutput> () {
@Override
public PCollection<String> apply(PCollection<String> input) {
- return input.apply(ParDo.of(new OldDoFn<String, String>() {
- @Override
+ return input.apply(ParDo.of(new DoFn<String, String>() {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(c.element());
}
@@ -79,8 +79,8 @@ public class DisplayDataEvaluatorTest implements Serializable {
@Test
public void testPrimitiveTransform() {
PTransform<? super PCollection<Integer>, ? super PCollection<Integer>> myTransform = ParDo.of(
- new OldDoFn<Integer, Integer>() {
- @Override
+ new DoFn<Integer, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
index 517f968..e2f38b4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
@@ -41,7 +41,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
@@ -1053,8 +1053,8 @@ public class DisplayDataTest implements Serializable {
private static class IdentityTransform<T> extends PTransform<PCollection<T>, PCollection<T>> {
@Override
public PCollection<T> apply(PCollection<T> input) {
- return input.apply(ParDo.of(new OldDoFn<T, T>() {
- @Override
+ return input.apply(ParDo.of(new DoFn<T, T>() {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(c.element());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
index 97667a3..c6f82ec 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
@@ -29,9 +29,8 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -84,10 +83,11 @@ public class CoGroupByKeyTest implements Serializable {
input = p.apply("Create" + name, Create.timestamped(list, timestamps)
.withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of())));
}
- return input
- .apply("Identity" + name, ParDo.of(new OldDoFn<KV<Integer, String>,
- KV<Integer, String>>() {
- @Override
+ return input.apply(
+ "Identity" + name,
+ ParDo.of(
+ new DoFn<KV<Integer, String>, KV<Integer, String>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element());
}
@@ -313,11 +313,11 @@ public class CoGroupByKeyTest implements Serializable {
}
/**
- * A OldDoFn used in testCoGroupByKeyWithWindowing(), to test processing the
- * results of a CoGroupByKey.
+ * A DoFn used in testCoGroupByKeyWithWindowing(), to test processing the results of a
+ * CoGroupByKey.
*/
- private static class ClickOfPurchaseFn extends
- OldDoFn<KV<Integer, CoGbkResult>, KV<String, String>> implements RequiresWindowAccess {
+ private static class ClickOfPurchaseFn
+ extends DoFn<KV<Integer, CoGbkResult>, KV<String, String>> {
private final TupleTag<String> clicksTag;
private final TupleTag<String> purchasesTag;
@@ -329,9 +329,9 @@ public class CoGroupByKeyTest implements Serializable {
this.purchasesTag = purchasesTag;
}
- @Override
- public void processElement(ProcessContext c) {
- BoundedWindow w = c.window();
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) {
+ BoundedWindow w = window;
KV<Integer, CoGbkResult> e = c.element();
CoGbkResult row = e.getValue();
Iterable<String> clicks = row.getAll(clicksTag);
@@ -347,11 +347,11 @@ public class CoGroupByKeyTest implements Serializable {
/**
- * A OldDoFn used in testCoGroupByKeyHandleResults(), to test processing the
+ * A DoFn used in testCoGroupByKeyHandleResults(), to test processing the
* results of a CoGroupByKey.
*/
private static class CorrelatePurchaseCountForAddressesWithoutNamesFn extends
- OldDoFn<KV<Integer, CoGbkResult>, KV<String, Integer>> {
+ DoFn<KV<Integer, CoGbkResult>, KV<String, Integer>> {
private final TupleTag<String> purchasesTag;
private final TupleTag<String> addressesTag;
@@ -367,7 +367,7 @@ public class CoGroupByKeyTest implements Serializable {
this.namesTag = namesTag;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
KV<Integer, CoGbkResult> e = c.element();
CoGbkResult row = e.getValue();
@@ -401,7 +401,7 @@ public class CoGroupByKeyTest implements Serializable {
}
/**
- * Tests that the consuming OldDoFn
+ * Tests that the consuming DoFn
* (CorrelatePurchaseCountForAddressesWithoutNamesFn) performs as expected.
*/
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 27d2539..c583860 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -36,8 +36,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -199,8 +199,8 @@ public class WindowTest implements Serializable {
.apply(GroupByKey.<Integer, String>create())
.apply(
ParDo.of(
- new OldDoFn<KV<Integer, Iterable<String>>, Void>() {
- @Override
+ new DoFn<KV<Integer, Iterable<String>>, Void>() {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
assertThat(
c.timestamp(),
@@ -231,8 +231,8 @@ public class WindowTest implements Serializable {
.apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10)))
.withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
.apply(GroupByKey.<Integer, String>create())
- .apply(ParDo.of(new OldDoFn<KV<Integer, Iterable<String>>, Void>() {
- @Override
+ .apply(ParDo.of(new DoFn<KV<Integer, Iterable<String>>, Void>() {
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
assertThat(c.timestamp(), equalTo(new Instant(10 * 60 * 1000 - 1)));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index 622a277..159e700 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -26,9 +26,8 @@ import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
@@ -58,12 +57,14 @@ public class WindowingTest implements Serializable {
private static class WindowedCount extends PTransform<PCollection<String>, PCollection<String>> {
- private final class FormatCountsDoFn
- extends OldDoFn<KV<String, Long>, String> implements RequiresWindowAccess {
- @Override
- public void processElement(ProcessContext c) {
- c.output(c.element().getKey() + ":" + c.element().getValue()
- + ":" + c.timestamp().getMillis() + ":" + c.window());
+ private final class FormatCountsDoFn extends DoFn<KV<String, Long>, String> {
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) {
+ c.output(
+ c.element().getKey()
+ + ":" + c.element().getValue()
+ + ":" + c.timestamp().getMillis()
+ + ":" + window);
}
}
private WindowFn<? super String, ?> windowFn;
@@ -234,9 +235,9 @@ public class WindowingTest implements Serializable {
p.run();
}
- /** A OldDoFn that tokenizes lines of text into individual words. */
- static class ExtractWordsWithTimestampsFn extends OldDoFn<String, String> {
- @Override
+ /** A DoFn that tokenizes lines of text into individual words. */
+ static class ExtractWordsWithTimestampsFn extends DoFn<String, String> {
+ @ProcessElement
public void processElement(ProcessContext c) {
String[] words = c.element().split("[^a-zA-Z0-9']+");
if (words.length == 2) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
index 547c778..13218b2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -75,8 +75,8 @@ public final class PCollectionTupleTest implements Serializable {
.apply(Create.of(inputs));
PCollectionTuple outputs = mainInput.apply(ParDo
- .of(new OldDoFn<Integer, Integer>() {
- @Override
+ .of(new DoFn<Integer, Integer>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.sideOutput(sideOutputTag, c.element());
}})
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1959ddbe/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
index c525cf1..287223f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.junit.Rule;
@@ -44,9 +44,9 @@ public class TypedPValueTest {
@Rule
public ExpectedException thrown = ExpectedException.none();
- private static class IdentityDoFn extends OldDoFn<Integer, Integer> {
+ private static class IdentityDoFn extends DoFn<Integer, Integer> {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(c.element());
}
@@ -129,9 +129,9 @@ public class TypedPValueTest {
static class EmptyClass {
}
- private static class EmptyClassDoFn extends OldDoFn<Integer, EmptyClass> {
+ private static class EmptyClassDoFn extends DoFn<Integer, EmptyClass> {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(new EmptyClass());
}