You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/06 02:53:00 UTC
[42/51] [abbrv] incubator-beam git commit: Port easy transforms to
new DoFn
Port easy transforms to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/47341e11
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/47341e11
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/47341e11
Branch: refs/heads/python-sdk
Commit: 47341e113334827101ddbf775c69ae34d178cd8f
Parents: 269fbf3
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Aug 3 20:27:28 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 14:56:42 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/transforms/Count.java | 4 ++--
.../java/org/apache/beam/sdk/transforms/Create.java | 4 ++--
.../apache/beam/sdk/transforms/FlatMapElements.java | 4 ++--
.../org/apache/beam/sdk/transforms/Flatten.java | 4 ++--
.../java/org/apache/beam/sdk/transforms/Keys.java | 4 ++--
.../java/org/apache/beam/sdk/transforms/KvSwap.java | 4 ++--
.../org/apache/beam/sdk/transforms/MapElements.java | 4 ++--
.../org/apache/beam/sdk/transforms/Partition.java | 4 ++--
.../beam/sdk/transforms/RemoveDuplicates.java | 4 ++--
.../java/org/apache/beam/sdk/transforms/Sample.java | 6 +++---
.../java/org/apache/beam/sdk/transforms/Values.java | 4 ++--
.../java/org/apache/beam/sdk/transforms/View.java | 8 ++++----
.../org/apache/beam/sdk/transforms/WithKeys.java | 4 ++--
.../apache/beam/sdk/transforms/WithTimestamps.java | 6 +++---
.../beam/sdk/transforms/join/CoGroupByKey.java | 16 ++++++++--------
15 files changed, 40 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index 7601ffc..ac59c76 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -107,8 +107,8 @@ public class Count {
public PCollection<KV<T, Long>> apply(PCollection<T> input) {
return
input
- .apply("Init", ParDo.of(new OldDoFn<T, KV<T, Void>>() {
- @Override
+ .apply("Init", ParDo.of(new DoFn<T, KV<T, Void>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(c.element(), (Void) null));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index fb7f784..08d0a7a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -486,8 +486,8 @@ public class Create<T> {
this.elementCoder = elementCoder;
}
- private static class ConvertTimestamps<T> extends OldDoFn<TimestampedValue<T>, T> {
- @Override
+ private static class ConvertTimestamps<T> extends DoFn<TimestampedValue<T>, T> {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index b48da38..694592e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -133,9 +133,9 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
@Override
public PCollection<OutputT> apply(PCollection<InputT> input) {
- return input.apply("Map", ParDo.of(new OldDoFn<InputT, OutputT>() {
+ return input.apply("Map", ParDo.of(new DoFn<InputT, OutputT>() {
private static final long serialVersionUID = 0L;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
for (OutputT element : fn.apply(c.element())) {
c.output(element);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
index 53e898e..7e09d7e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
@@ -174,8 +174,8 @@ public class Flatten {
Coder<T> elemCoder = ((IterableLikeCoder<T, ?>) inCoder).getElemCoder();
return in.apply("FlattenIterables", ParDo.of(
- new OldDoFn<Iterable<T>, T>() {
- @Override
+ new DoFn<Iterable<T>, T>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
for (T i : c.element()) {
c.output(i);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
index c8cbce8..5ac1866 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
@@ -58,8 +58,8 @@ public class Keys<K> extends PTransform<PCollection<? extends KV<K, ?>>,
@Override
public PCollection<K> apply(PCollection<? extends KV<K, ?>> in) {
return
- in.apply("Keys", ParDo.of(new OldDoFn<KV<K, ?>, K>() {
- @Override
+ in.apply("Keys", ParDo.of(new DoFn<KV<K, ?>, K>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().getKey());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
index 430d37b..d4386d2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
@@ -62,8 +62,8 @@ public class KvSwap<K, V> extends PTransform<PCollection<KV<K, V>>,
@Override
public PCollection<KV<V, K>> apply(PCollection<KV<K, V>> in) {
return
- in.apply("KvSwap", ParDo.of(new OldDoFn<KV<K, V>, KV<V, K>>() {
- @Override
+ in.apply("KvSwap", ParDo.of(new DoFn<KV<K, V>, KV<V, K>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
KV<K, V> e = c.element();
c.output(KV.of(e.getValue(), e.getKey()));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index c83c39f..b7b9a5f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -104,8 +104,8 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
@Override
public PCollection<OutputT> apply(PCollection<InputT> input) {
- return input.apply("Map", ParDo.of(new OldDoFn<InputT, OutputT>() {
- @Override
+ return input.apply("Map", ParDo.of(new DoFn<InputT, OutputT>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(fn.apply(c.element()));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
index 2ddcc29..05c9470 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
@@ -134,7 +134,7 @@ public class Partition<T> extends PTransform<PCollection<T>, PCollectionList<T>>
this.partitionDoFn = partitionDoFn;
}
- private static class PartitionDoFn<X> extends OldDoFn<X, Void> {
+ private static class PartitionDoFn<X> extends DoFn<X, Void> {
private final int numPartitions;
private final PartitionFn<? super X> partitionFn;
private final TupleTagList outputTags;
@@ -163,7 +163,7 @@ public class Partition<T> extends PTransform<PCollection<T>, PCollectionList<T>>
return outputTags;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
X input = c.element();
int partition = partitionFn.partitionFor(input, numPartitions);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
index d82c457..bba4b51 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
@@ -85,8 +85,8 @@ public class RemoveDuplicates<T> extends PTransform<PCollection<T>,
@Override
public PCollection<T> apply(PCollection<T> in) {
return in
- .apply("CreateIndex", ParDo.of(new OldDoFn<T, KV<T, Void>>() {
- @Override
+ .apply("CreateIndex", ParDo.of(new DoFn<T, KV<T, Void>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(c.element(), (Void) null));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
index 724b252..12ff2b9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
@@ -164,9 +164,9 @@ public class Sample {
}
/**
- * A {@link OldDoFn} that returns up to limit elements from the side input PCollection.
+ * A {@link DoFn} that returns up to limit elements from the side input PCollection.
*/
- private static class SampleAnyDoFn<T> extends OldDoFn<Void, T> {
+ private static class SampleAnyDoFn<T> extends DoFn<Void, T> {
long limit;
final PCollectionView<Iterable<T>> iterableView;
@@ -175,7 +175,7 @@ public class Sample {
this.iterableView = iterableView;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
for (T i : c.sideInput(iterableView)) {
if (limit-- <= 0) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
index 856e32a..34342db 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
@@ -58,8 +58,8 @@ public class Values<V> extends PTransform<PCollection<? extends KV<?, V>>,
@Override
public PCollection<V> apply(PCollection<? extends KV<?, V>> in) {
return
- in.apply("Values", ParDo.of(new OldDoFn<KV<?, V>, V>() {
- @Override
+ in.apply("Values", ParDo.of(new DoFn<KV<?, V>, V>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().getValue());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 8a61637..7a97c13 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -38,7 +38,7 @@ import java.util.Map;
*
* <p>When a {@link ParDo} tranform is processing a main input
* element in a window {@code w} and a {@link PCollectionView} is read via
- * {@link OldDoFn.ProcessContext#sideInput}, the value of the view for {@code w} is
+ * {@link DoFn.ProcessContext#sideInput}, the value of the view for {@code w} is
* returned.
*
* <p>The SDK supports viewing a {@link PCollection}, per window, as a single value,
@@ -118,7 +118,7 @@ import java.util.Map;
*
* PCollection PageVisits = urlVisits
* .apply(ParDo.withSideInputs(urlToPage)
- * .of(new OldDoFn<UrlVisit, PageVisit>() {
+ * .of(new DoFn<UrlVisit, PageVisit>() {
* {@literal @}Override
* void processElement(ProcessContext context) {
* UrlVisit urlVisit = context.element();
@@ -154,11 +154,11 @@ public class View {
*
* <p>If the input {@link PCollection} is empty,
* throws {@link java.util.NoSuchElementException} in the consuming
- * {@link OldDoFn}.
+ * {@link DoFn}.
*
* <p>If the input {@link PCollection} contains more than one
* element, throws {@link IllegalArgumentException} in the
- * consuming {@link OldDoFn}.
+ * consuming {@link DoFn}.
*/
public static <T> AsSingleton<T> asSingleton() {
return new AsSingleton<>();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
index 37d45aa..2a44963 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
@@ -113,8 +113,8 @@ public class WithKeys<K, V> extends PTransform<PCollection<V>,
@Override
public PCollection<KV<K, V>> apply(PCollection<V> in) {
PCollection<KV<K, V>> result =
- in.apply("AddKeys", ParDo.of(new OldDoFn<V, KV<K, V>>() {
- @Override
+ in.apply("AddKeys", ParDo.of(new DoFn<V, KV<K, V>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(fn.apply(c.element()),
c.element()));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
index 41b549b..7b395f5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
@@ -92,7 +92,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
* Returns the allowed timestamp skew duration, which is the maximum
* duration that timestamps can be shifted backwards from the timestamp of the input element.
*
- * @see OldDoFn#getAllowedTimestampSkew()
+ * @see DoFn#getAllowedTimestampSkew()
*/
public Duration getAllowedTimestampSkew() {
return allowedTimestampSkew;
@@ -105,7 +105,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
.setTypeDescriptorInternal(input.getTypeDescriptor());
}
- private static class AddTimestampsDoFn<T> extends OldDoFn<T, T> {
+ private static class AddTimestampsDoFn<T> extends DoFn<T, T> {
private final SerializableFunction<T, Instant> fn;
private final Duration allowedTimestampSkew;
@@ -114,7 +114,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
this.allowedTimestampSkew = allowedTimestampSkew;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
Instant timestamp = fn.apply(c.element());
checkNotNull(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
index 1bd9f4a..cb06f95 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
@@ -19,9 +19,9 @@ package org.apache.beam.sdk.transforms.join;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder;
@@ -57,7 +57,7 @@ import java.util.List;
*
* PCollection<T> finalResultCollection =
* coGbkResultCollection.apply(ParDo.of(
- * new OldDoFn<KV<K, CoGbkResult>, T>() {
+ * new DoFn<KV<K, CoGbkResult>, T>() {
* @Override
* public void processElement(ProcessContext c) {
* KV<K, CoGbkResult> e = c.element();
@@ -167,12 +167,12 @@ public class CoGroupByKey<K> extends
}
/**
- * A OldDoFn to construct a UnionTable (i.e., a
+ * A DoFn to construct a UnionTable (i.e., a
* {@code PCollection<KV<K, RawUnionValue>>} from a
* {@code PCollection<KV<K, V>>}.
*/
private static class ConstructUnionTableFn<K, V> extends
- OldDoFn<KV<K, V>, KV<K, RawUnionValue>> {
+ DoFn<KV<K, V>, KV<K, RawUnionValue>> {
private final int index;
@@ -180,7 +180,7 @@ public class CoGroupByKey<K> extends
this.index = index;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
KV<K, ?> e = c.element();
c.output(KV.of(e.getKey(), new RawUnionValue(index, e.getValue())));
@@ -188,11 +188,11 @@ public class CoGroupByKey<K> extends
}
/**
- * A OldDoFn to construct a CoGbkResult from an input grouped union
+ * A DoFn to construct a CoGbkResult from an input grouped union
* table.
*/
private static class ConstructCoGbkResultFn<K>
- extends OldDoFn<KV<K, Iterable<RawUnionValue>>,
+ extends DoFn<KV<K, Iterable<RawUnionValue>>,
KV<K, CoGbkResult>> {
private final CoGbkResultSchema schema;
@@ -201,7 +201,7 @@ public class CoGroupByKey<K> extends
this.schema = schema;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
KV<K, Iterable<RawUnionValue>> e = c.element();
c.output(KV.of(e.getKey(), new CoGbkResult(schema, e.getValue())));