You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/08 20:40:40 UTC
[05/13] incubator-beam git commit: Port Reshuffle to new DoFn
Port Reshuffle to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ecf21a5c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ecf21a5c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ecf21a5c
Branch: refs/heads/master
Commit: ecf21a5cc177c39e515e4c78e16b579ac298c999
Parents: d798413
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 11:47:23 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700
----------------------------------------------------------------------
.../core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ecf21a5c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
index 66c7cc0..ad33a25 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
@@ -17,8 +17,8 @@
*/
package org.apache.beam.sdk.util;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -70,8 +70,8 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti
// set allowed lateness.
.setWindowingStrategyInternal(originalStrategy)
.apply("ExpandIterable", ParDo.of(
- new OldDoFn<KV<K, Iterable<V>>, KV<K, V>>() {
- @Override
+ new DoFn<KV<K, Iterable<V>>, KV<K, V>>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
K key = c.element().getKey();
for (V value : c.element().getValue()) {