You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/08 20:40:40 UTC

[05/13] incubator-beam git commit: Port Reshuffle to new DoFn

Port Reshuffle to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ecf21a5c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ecf21a5c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ecf21a5c

Branch: refs/heads/master
Commit: ecf21a5cc177c39e515e4c78e16b579ac298c999
Parents: d798413
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 11:47:23 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700

----------------------------------------------------------------------
 .../core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ecf21a5c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
index 66c7cc0..ad33a25 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.sdk.util;
 
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -70,8 +70,8 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti
         // set allowed lateness.
         .setWindowingStrategyInternal(originalStrategy)
         .apply("ExpandIterable", ParDo.of(
-            new OldDoFn<KV<K, Iterable<V>>, KV<K, V>>() {
-              @Override
+            new DoFn<KV<K, Iterable<V>>, KV<K, V>>() {
+              @ProcessElement
               public void processElement(ProcessContext c) {
                 K key = c.element().getKey();
                 for (V value : c.element().getValue()) {