You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/24 19:52:42 UTC
[04/17] incubator-beam git commit: Fix Emission in
startBundle/finishBundle in Flink Wrappers
Fix Emission in startBundle/finishBundle in Flink Wrappers
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d94bffdd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d94bffdd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d94bffdd
Branch: refs/heads/master
Commit: d94bffdd20f7bb2f380c807f84c5405552a40f71
Parents: a87015b
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sat Jun 11 10:55:55 2016 +0200
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Aug 24 12:46:24 2016 -0700
----------------------------------------------------------------------
.../flink/translation/functions/FlinkDoFnFunction.java | 4 ++--
.../translation/functions/FlinkMultiOutputDoFnFunction.java | 7 +++++--
2 files changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d94bffdd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index fdf1e59..733d3d4 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -94,8 +94,8 @@ public class FlinkDoFnFunction<InputT, OutputT>
}
}
- // set the windowed value to null so that the logic
- // or outputting in finishBundle kicks in
+ // set the windowed value to null so that the special logic for outputting
+ // in startBundle/finishBundle kicks in
context = context.forWindowedValue(null);
this.doFn.finishBundle(context);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d94bffdd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
index 5013b90..ef75878 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
@@ -33,8 +33,8 @@ import org.apache.flink.util.Collector;
import java.util.Map;
/**
- * Encapsulates a {@link OldDoFn} that uses side outputs
- * inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}.
+ * Encapsulates a {@link OldDoFn} that can emit to multiple
+ * outputs inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}.
*
* We get a mapping from {@link org.apache.beam.sdk.values.TupleTag} to output index
* and must tag all outputs with the output number. Afterwards a filter will filter out
@@ -106,6 +106,9 @@ public class FlinkMultiOutputDoFnFunction<InputT, OutputT>
}
}
+ // set the windowed value to null so that the special logic for outputting
+ // in startBundle/finishBundle kicks in
+ context = context.forWindowedValue(null);
this.doFn.finishBundle(context);
}