You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/06/04 06:14:53 UTC
[1/2] beam git commit: Fix race condition when outputting pushed-back
elements in Flink Runner
Repository: beam
Updated Branches:
refs/heads/master 43c44232d -> ef56ea495
Fix race condition when outputting pushed-back elements in Flink Runner
This affected the Flink Streaming Runner DoFnOperator. The recent fix of
emitting pushed-back data when receiving a watermark on the first input
put the emission at the end of the method. This can cause the emitted
data to become late. The fix is to move the pushed-back element emission
to the start of the method.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d17c0132
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d17c0132
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d17c0132
Branch: refs/heads/master
Commit: d17c013240a14b12992cf00f30e5151c7e97f360
Parents: 43c4423
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Jun 2 15:57:01 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sun Jun 4 08:08:14 2017 +0200
----------------------------------------------------------------------
.../wrappers/streaming/DoFnOperator.java | 18 +++++++++---------
1 file changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d17c0132/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index e473046..594fe0e 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -445,6 +445,15 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
@Override
public void processWatermark1(Watermark mark) throws Exception {
+ // We do the check here because we are guaranteed to at least get the +Inf watermark on the
+ // main input when the job finishes.
+ if (currentSideInputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+ // this means we will never see any more side input
+ // we also do the check here because we might have received the side-input MAX watermark
+ // before receiving any main-input data
+ emitAllPushedBackData();
+ }
+
if (keyCoder == null) {
setCurrentInputWatermark(mark.getTimestamp());
long potentialOutputWatermark =
@@ -476,15 +485,6 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
}
pushbackDoFnRunner.finishBundle();
}
-
- // We do the check here because we are guaranteed to at least get the +Inf watermark on the
- // main input when the job finishes.
- if (currentSideInputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
- // this means we will never see any more side input
- // we also do the check here because we might have received the side-input MAX watermark
- // before receiving any main-input data
- emitAllPushedBackData();
- }
}
@Override
[2/2] beam git commit: This closes #3285
Posted by al...@apache.org.
This closes #3285
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ef56ea49
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ef56ea49
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ef56ea49
Branch: refs/heads/master
Commit: ef56ea49569f00e098045ec01812b962adfb157d
Parents: 43c4423 d17c013
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sun Jun 4 08:14:08 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sun Jun 4 08:14:08 2017 +0200
----------------------------------------------------------------------
.../wrappers/streaming/DoFnOperator.java | 18 +++++++++---------
1 file changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------