You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/24 19:52:42 UTC

[04/17] incubator-beam git commit: Fix Emission in startBundle/finishBundle in Flink Wrappers

Fix Emission in startBundle/finishBundle in Flink Wrappers


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d94bffdd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d94bffdd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d94bffdd

Branch: refs/heads/master
Commit: d94bffdd20f7bb2f380c807f84c5405552a40f71
Parents: a87015b
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sat Jun 11 10:55:55 2016 +0200
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Aug 24 12:46:24 2016 -0700

----------------------------------------------------------------------
 .../flink/translation/functions/FlinkDoFnFunction.java        | 4 ++--
 .../translation/functions/FlinkMultiOutputDoFnFunction.java   | 7 +++++--
 2 files changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d94bffdd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index fdf1e59..733d3d4 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -94,8 +94,8 @@ public class FlinkDoFnFunction<InputT, OutputT>
       }
     }
 
-    // set the windowed value to null so that the logic
-    // or outputting in finishBundle kicks in
+    // set the windowed value to null so that the special logic for outputting
+    // in startBundle/finishBundle kicks in
     context = context.forWindowedValue(null);
     this.doFn.finishBundle(context);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d94bffdd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
index 5013b90..ef75878 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
@@ -33,8 +33,8 @@ import org.apache.flink.util.Collector;
 import java.util.Map;
 
 /**
- * Encapsulates a {@link OldDoFn} that uses side outputs
- * inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}.
+ * Encapsulates a {@link OldDoFn} that can emit to multiple
+ * outputs inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}.
  *
  * We get a mapping from {@link org.apache.beam.sdk.values.TupleTag} to output index
  * and must tag all outputs with the output number. Afterwards a filter will filter out
@@ -106,6 +106,9 @@ public class FlinkMultiOutputDoFnFunction<InputT, OutputT>
       }
     }
 
+    // set the windowed value to null so that the special logic for outputting
+    // in startBundle/finishBundle kicks in
+    context = context.forWindowedValue(null);
     this.doFn.finishBundle(context);
   }