You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/06 02:53:00 UTC

[42/51] [abbrv] incubator-beam git commit: Port easy transforms to new DoFn

Port easy transforms to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/47341e11
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/47341e11
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/47341e11

Branch: refs/heads/python-sdk
Commit: 47341e113334827101ddbf775c69ae34d178cd8f
Parents: 269fbf3
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Aug 3 20:27:28 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Aug 4 14:56:42 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/transforms/Count.java  |  4 ++--
 .../java/org/apache/beam/sdk/transforms/Create.java |  4 ++--
 .../apache/beam/sdk/transforms/FlatMapElements.java |  4 ++--
 .../org/apache/beam/sdk/transforms/Flatten.java     |  4 ++--
 .../java/org/apache/beam/sdk/transforms/Keys.java   |  4 ++--
 .../java/org/apache/beam/sdk/transforms/KvSwap.java |  4 ++--
 .../org/apache/beam/sdk/transforms/MapElements.java |  4 ++--
 .../org/apache/beam/sdk/transforms/Partition.java   |  4 ++--
 .../beam/sdk/transforms/RemoveDuplicates.java       |  4 ++--
 .../java/org/apache/beam/sdk/transforms/Sample.java |  6 +++---
 .../java/org/apache/beam/sdk/transforms/Values.java |  4 ++--
 .../java/org/apache/beam/sdk/transforms/View.java   |  8 ++++----
 .../org/apache/beam/sdk/transforms/WithKeys.java    |  4 ++--
 .../apache/beam/sdk/transforms/WithTimestamps.java  |  6 +++---
 .../beam/sdk/transforms/join/CoGroupByKey.java      | 16 ++++++++--------
 15 files changed, 40 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index 7601ffc..ac59c76 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -107,8 +107,8 @@ public class Count {
     public PCollection<KV<T, Long>> apply(PCollection<T> input) {
       return
           input
-          .apply("Init", ParDo.of(new OldDoFn<T, KV<T, Void>>() {
-            @Override
+          .apply("Init", ParDo.of(new DoFn<T, KV<T, Void>>() {
+            @ProcessElement
             public void processElement(ProcessContext c) {
               c.output(KV.of(c.element(), (Void) null));
             }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index fb7f784..08d0a7a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -486,8 +486,8 @@ public class Create<T> {
       this.elementCoder = elementCoder;
     }
 
-    private static class ConvertTimestamps<T> extends OldDoFn<TimestampedValue<T>, T> {
-      @Override
+    private static class ConvertTimestamps<T> extends DoFn<TimestampedValue<T>, T> {
+      @ProcessElement
       public void processElement(ProcessContext c) {
         c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp());
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index b48da38..694592e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -133,9 +133,9 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
 
   @Override
   public PCollection<OutputT> apply(PCollection<InputT> input) {
-    return input.apply("Map", ParDo.of(new OldDoFn<InputT, OutputT>() {
+    return input.apply("Map", ParDo.of(new DoFn<InputT, OutputT>() {
       private static final long serialVersionUID = 0L;
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) {
         for (OutputT element : fn.apply(c.element())) {
           c.output(element);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
index 53e898e..7e09d7e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
@@ -174,8 +174,8 @@ public class Flatten {
       Coder<T> elemCoder = ((IterableLikeCoder<T, ?>) inCoder).getElemCoder();
 
       return in.apply("FlattenIterables", ParDo.of(
-          new OldDoFn<Iterable<T>, T>() {
-            @Override
+          new DoFn<Iterable<T>, T>() {
+            @ProcessElement
             public void processElement(ProcessContext c) {
               for (T i : c.element()) {
                 c.output(i);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
index c8cbce8..5ac1866 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
@@ -58,8 +58,8 @@ public class Keys<K> extends PTransform<PCollection<? extends KV<K, ?>>,
   @Override
   public PCollection<K> apply(PCollection<? extends KV<K, ?>> in) {
     return
-        in.apply("Keys", ParDo.of(new OldDoFn<KV<K, ?>, K>() {
-          @Override
+        in.apply("Keys", ParDo.of(new DoFn<KV<K, ?>, K>() {
+          @ProcessElement
           public void processElement(ProcessContext c) {
             c.output(c.element().getKey());
           }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
index 430d37b..d4386d2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
@@ -62,8 +62,8 @@ public class KvSwap<K, V> extends PTransform<PCollection<KV<K, V>>,
   @Override
   public PCollection<KV<V, K>> apply(PCollection<KV<K, V>> in) {
     return
-        in.apply("KvSwap", ParDo.of(new OldDoFn<KV<K, V>, KV<V, K>>() {
-          @Override
+        in.apply("KvSwap", ParDo.of(new DoFn<KV<K, V>, KV<V, K>>() {
+          @ProcessElement
           public void processElement(ProcessContext c) {
             KV<K, V> e = c.element();
             c.output(KV.of(e.getValue(), e.getKey()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index c83c39f..b7b9a5f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -104,8 +104,8 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
 
   @Override
   public PCollection<OutputT> apply(PCollection<InputT> input) {
-    return input.apply("Map", ParDo.of(new OldDoFn<InputT, OutputT>() {
-      @Override
+    return input.apply("Map", ParDo.of(new DoFn<InputT, OutputT>() {
+      @ProcessElement
       public void processElement(ProcessContext c) {
         c.output(fn.apply(c.element()));
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
index 2ddcc29..05c9470 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
@@ -134,7 +134,7 @@ public class Partition<T> extends PTransform<PCollection<T>, PCollectionList<T>>
     this.partitionDoFn = partitionDoFn;
   }
 
-  private static class PartitionDoFn<X> extends OldDoFn<X, Void> {
+  private static class PartitionDoFn<X> extends DoFn<X, Void> {
     private final int numPartitions;
     private final PartitionFn<? super X> partitionFn;
     private final TupleTagList outputTags;
@@ -163,7 +163,7 @@ public class Partition<T> extends PTransform<PCollection<T>, PCollectionList<T>>
       return outputTags;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       X input = c.element();
       int partition = partitionFn.partitionFor(input, numPartitions);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
index d82c457..bba4b51 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
@@ -85,8 +85,8 @@ public class RemoveDuplicates<T> extends PTransform<PCollection<T>,
   @Override
   public PCollection<T> apply(PCollection<T> in) {
     return in
-        .apply("CreateIndex", ParDo.of(new OldDoFn<T, KV<T, Void>>() {
-          @Override
+        .apply("CreateIndex", ParDo.of(new DoFn<T, KV<T, Void>>() {
+          @ProcessElement
           public void processElement(ProcessContext c) {
             c.output(KV.of(c.element(), (Void) null));
           }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
index 724b252..12ff2b9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
@@ -164,9 +164,9 @@ public class Sample {
   }
 
   /**
-   * A {@link OldDoFn} that returns up to limit elements from the side input PCollection.
+   * A {@link DoFn} that returns up to limit elements from the side input PCollection.
    */
-  private static class SampleAnyDoFn<T> extends OldDoFn<Void, T> {
+  private static class SampleAnyDoFn<T> extends DoFn<Void, T> {
     long limit;
     final PCollectionView<Iterable<T>> iterableView;
 
@@ -175,7 +175,7 @@ public class Sample {
       this.iterableView = iterableView;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       for (T i : c.sideInput(iterableView)) {
         if (limit-- <= 0) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
index 856e32a..34342db 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
@@ -58,8 +58,8 @@ public class Values<V> extends PTransform<PCollection<? extends KV<?, V>>,
   @Override
   public PCollection<V> apply(PCollection<? extends KV<?, V>> in) {
     return
-        in.apply("Values", ParDo.of(new OldDoFn<KV<?, V>, V>() {
-          @Override
+        in.apply("Values", ParDo.of(new DoFn<KV<?, V>, V>() {
+          @ProcessElement
           public void processElement(ProcessContext c) {
             c.output(c.element().getValue());
           }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 8a61637..7a97c13 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -38,7 +38,7 @@ import java.util.Map;
  *
  * <p>When a {@link ParDo} tranform is processing a main input
  * element in a window {@code w} and a {@link PCollectionView} is read via
- * {@link OldDoFn.ProcessContext#sideInput}, the value of the view for {@code w} is
+ * {@link DoFn.ProcessContext#sideInput}, the value of the view for {@code w} is
  * returned.
  *
  * <p>The SDK supports viewing a {@link PCollection}, per window, as a single value,
@@ -118,7 +118,7 @@ import java.util.Map;
  *
  * PCollection PageVisits = urlVisits
  *     .apply(ParDo.withSideInputs(urlToPage)
- *         .of(new OldDoFn<UrlVisit, PageVisit>() {
+ *         .of(new DoFn<UrlVisit, PageVisit>() {
  *             {@literal @}Override
  *             void processElement(ProcessContext context) {
  *               UrlVisit urlVisit = context.element();
@@ -154,11 +154,11 @@ public class View {
    *
    * <p>If the input {@link PCollection} is empty,
    * throws {@link java.util.NoSuchElementException} in the consuming
-   * {@link OldDoFn}.
+   * {@link DoFn}.
    *
    * <p>If the input {@link PCollection} contains more than one
    * element, throws {@link IllegalArgumentException} in the
-   * consuming {@link OldDoFn}.
+   * consuming {@link DoFn}.
    */
   public static <T> AsSingleton<T> asSingleton() {
     return new AsSingleton<>();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
index 37d45aa..2a44963 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
@@ -113,8 +113,8 @@ public class WithKeys<K, V> extends PTransform<PCollection<V>,
   @Override
   public PCollection<KV<K, V>> apply(PCollection<V> in) {
     PCollection<KV<K, V>> result =
-        in.apply("AddKeys", ParDo.of(new OldDoFn<V, KV<K, V>>() {
-          @Override
+        in.apply("AddKeys", ParDo.of(new DoFn<V, KV<K, V>>() {
+          @ProcessElement
           public void processElement(ProcessContext c) {
             c.output(KV.of(fn.apply(c.element()),
                 c.element()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
index 41b549b..7b395f5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
@@ -92,7 +92,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
    * Returns the allowed timestamp skew duration, which is the maximum
    * duration that timestamps can be shifted backwards from the timestamp of the input element.
    *
-   * @see OldDoFn#getAllowedTimestampSkew()
+   * @see DoFn#getAllowedTimestampSkew()
    */
   public Duration getAllowedTimestampSkew() {
     return allowedTimestampSkew;
@@ -105,7 +105,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
         .setTypeDescriptorInternal(input.getTypeDescriptor());
   }
 
-  private static class AddTimestampsDoFn<T> extends OldDoFn<T, T> {
+  private static class AddTimestampsDoFn<T> extends DoFn<T, T> {
     private final SerializableFunction<T, Instant> fn;
     private final Duration allowedTimestampSkew;
 
@@ -114,7 +114,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
       this.allowedTimestampSkew = allowedTimestampSkew;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       Instant timestamp = fn.apply(c.element());
       checkNotNull(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
index 1bd9f4a..cb06f95 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
@@ -19,9 +19,9 @@ package org.apache.beam.sdk.transforms.join;
 
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder;
@@ -57,7 +57,7 @@ import java.util.List;
  *
  * PCollection<T> finalResultCollection =
  *   coGbkResultCollection.apply(ParDo.of(
- *     new OldDoFn<KV<K, CoGbkResult>, T>() {
+ *     new DoFn<KV<K, CoGbkResult>, T>() {
  *       @Override
  *       public void processElement(ProcessContext c) {
  *         KV<K, CoGbkResult> e = c.element();
@@ -167,12 +167,12 @@ public class CoGroupByKey<K> extends
   }
 
   /**
-   * A OldDoFn to construct a UnionTable (i.e., a
+   * A DoFn to construct a UnionTable (i.e., a
    * {@code PCollection<KV<K, RawUnionValue>>} from a
    * {@code PCollection<KV<K, V>>}.
    */
   private static class ConstructUnionTableFn<K, V> extends
-      OldDoFn<KV<K, V>, KV<K, RawUnionValue>> {
+      DoFn<KV<K, V>, KV<K, RawUnionValue>> {
 
     private final int index;
 
@@ -180,7 +180,7 @@ public class CoGroupByKey<K> extends
       this.index = index;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       KV<K, ?> e = c.element();
       c.output(KV.of(e.getKey(), new RawUnionValue(index, e.getValue())));
@@ -188,11 +188,11 @@ public class CoGroupByKey<K> extends
   }
 
   /**
-   * A OldDoFn to construct a CoGbkResult from an input grouped union
+   * A DoFn to construct a CoGbkResult from an input grouped union
    * table.
     */
   private static class ConstructCoGbkResultFn<K>
-    extends OldDoFn<KV<K, Iterable<RawUnionValue>>,
+    extends DoFn<KV<K, Iterable<RawUnionValue>>,
                      KV<K, CoGbkResult>> {
 
     private final CoGbkResultSchema schema;
@@ -201,7 +201,7 @@ public class CoGroupByKey<K> extends
       this.schema = schema;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       KV<K, Iterable<RawUnionValue>> e = c.element();
       c.output(KV.of(e.getKey(), new CoGbkResult(schema, e.getValue())));