You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/07/24 08:04:18 UTC

[2/4] beam git commit: [BEAM-2571] Clarify pushedback variable name in DoFnOperator

[BEAM-2571] Clarify pushedback variable name in DoFnOperator


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b03c4f07
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b03c4f07
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b03c4f07

Branch: refs/heads/master
Commit: b03c4f0790fa639d739f7f3fdeaa4a703fadb8fa
Parents: 8449931
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Jul 12 14:39:58 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jul 24 09:47:44 2017 +0200

----------------------------------------------------------------------
 .../flink/translation/wrappers/streaming/DoFnOperator.java     | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b03c4f07/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index b1f3b86..3b234ac 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -470,15 +470,15 @@ public class DoFnOperator<InputT, OutputT>
       setCurrentInputWatermark(mark.getTimestamp());
 
       // hold back by the pushed back values waiting for side inputs
-      long actualInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp());
+      long pushedBackInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp());
 
-      timerService.advanceWatermark(actualInputWatermark);
+      timerService.advanceWatermark(pushedBackInputWatermark);
 
       Instant watermarkHold = stateInternals.watermarkHold();
 
       long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold());
 
-      long potentialOutputWatermark = Math.min(currentInputWatermark, combinedWatermarkHold);
+      long potentialOutputWatermark = Math.min(pushedBackInputWatermark, combinedWatermarkHold);
 
       if (potentialOutputWatermark > currentOutputWatermark) {
         setCurrentOutputWatermark(potentialOutputWatermark);