You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/08 20:40:38 UTC

[03/13] incubator-beam git commit: Port most of Combine to new DoFn

Port most of Combine to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/331f5234
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/331f5234
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/331f5234

Branch: refs/heads/master
Commit: 331f523461094af666a20bd97e1e15f1dec3feba
Parents: b1db02d
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 12:11:11 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Aug 8 11:35:17 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/Combine.java | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/331f5234/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 6fc2324..a825800 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -1473,9 +1473,9 @@ public class Combine {
       PCollection<OutputT> defaultIfEmpty = maybeEmpty.getPipeline()
           .apply("CreateVoid", Create.of((Void) null).withCoder(VoidCoder.of()))
           .apply("ProduceDefault", ParDo.withSideInputs(maybeEmptyView).of(
-              new OldDoFn<Void, OutputT>() {
-                @Override
-                public void processElement(OldDoFn<Void, OutputT>.ProcessContext c) {
+              new DoFn<Void, OutputT>() {
+                @ProcessElement
+                public void processElement(ProcessContext c) {
                   Iterator<OutputT> combined = c.sideInput(maybeEmptyView).iterator();
                   if (!combined.hasNext()) {
                     c.output(defaultValue);
@@ -2097,15 +2097,15 @@ public class Combine {
       final TupleTag<KV<KV<K, Integer>, InputT>> hot = new TupleTag<>();
       final TupleTag<KV<K, InputT>> cold = new TupleTag<>();
       PCollectionTuple split = input.apply("AddNonce", ParDo.of(
-          new OldDoFn<KV<K, InputT>, KV<K, InputT>>() {
+          new DoFn<KV<K, InputT>, KV<K, InputT>>() {
             transient int counter;
-            @Override
+            @StartBundle
             public void startBundle(Context c) {
               counter = ThreadLocalRandom.current().nextInt(
                   Integer.MAX_VALUE);
             }
 
-            @Override
+            @ProcessElement
             public void processElement(ProcessContext c) {
               KV<K, InputT> kv = c.element();
               int spread = Math.max(1, hotKeyFanout.apply(kv.getKey()));
@@ -2135,9 +2135,9 @@ public class Combine {
           .setWindowingStrategyInternal(preCombineStrategy)
           .apply("PreCombineHot", Combine.perKey(hotPreCombine))
           .apply("StripNonce", ParDo.of(
-              new OldDoFn<KV<KV<K, Integer>, AccumT>,
+              new DoFn<KV<KV<K, Integer>, AccumT>,
                                      KV<K, InputOrAccum<InputT, AccumT>>>() {
-                @Override
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   c.output(KV.of(
                       c.element().getKey().getKey(),
@@ -2151,8 +2151,8 @@ public class Combine {
           .get(cold)
           .setCoder(inputCoder)
           .apply("PrepareCold", ParDo.of(
-              new OldDoFn<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>() {
-                @Override
+              new DoFn<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>() {
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   c.output(KV.of(c.element().getKey(),
                                  InputOrAccum.<InputT, AccumT>input(c.element().getValue())));