You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/23 05:44:16 UTC

[GitHub] [beam] tweise commented on a change in pull request #11362: [BEAM-9733] Always let ImpulseSourceFunction emit a final watermark

tweise commented on a change in pull request #11362:
URL: https://github.com/apache/beam/pull/11362#discussion_r413524983



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -658,46 +671,69 @@ public void processWatermark1(Watermark mark) throws Exception {
       emitAllPushedBackData();
     }
 
-    setCurrentInputWatermark(mark.getTimestamp());
+    currentInputWatermark = mark.getTimestamp();
 
-    if (keyCoder == null) {
-      long potentialOutputWatermark = Math.min(getPushbackWatermarkHold(), currentInputWatermark);
-      if (potentialOutputWatermark > currentOutputWatermark) {
-        setCurrentOutputWatermark(potentialOutputWatermark);
-        emitWatermark(currentOutputWatermark);
-      }
-    } else {
-      // hold back by the pushed back values waiting for side inputs
-      long pushedBackInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp());
+    long inputWatermarkHold = applyInputWatermarkHold(getEffectiveInputWatermark());
+    if (keyCoder != null) {
+      timeServiceManager.advanceWatermark(new Watermark(inputWatermarkHold));
+    }
 
-      timeServiceManager.advanceWatermark(new Watermark(pushedBackInputWatermark));
+    long potentialOutputWatermark =
+        applyOutputWatermarkHold(
+            currentOutputWatermark, computeOutputWatermark(inputWatermarkHold));
+    maybeEmitWatermark(potentialOutputWatermark);
+  }
 
-      Instant watermarkHold = keyedStateInternals.watermarkHold();
+  /**
+   * Allows to apply a hold to the input watermark. By default, just passes the input watermark
+   * through.
+   */
+  public long applyInputWatermarkHold(long inputWatermark) {
+    return inputWatermark;
+  }
 
-      long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold());
-      combinedWatermarkHold =
-          Math.min(combinedWatermarkHold, timerInternals.getMinOutputTimestampMs());
-      long potentialOutputWatermark = Math.min(pushedBackInputWatermark, combinedWatermarkHold);
+  /**
+   * Allows to apply a hold to the output watermark before it is send out. By default, just passes
+   * the potential output watermark through which will make it the new output watermark.
+   *
+   * @param currentOutputWatermark the current output watermark
+   * @param potentialOutputWatermark The potential new output watermark which can be adjusted, if
+   *     needed. The input watermark hold has already been applied.
+   * @return The new output watermark which will be emitted.
+   */
+  public long applyOutputWatermarkHold(long currentOutputWatermark, long potentialOutputWatermark) {
+    return potentialOutputWatermark;
+  }
 
-      if (potentialOutputWatermark > currentOutputWatermark) {
-        setCurrentOutputWatermark(potentialOutputWatermark);
-        emitWatermark(currentOutputWatermark);
-      }
+  private long computeOutputWatermark(long inputWatermarkHold) {
+    final long potentialOutputWatermark;
+    if (keyCoder == null) {
+      potentialOutputWatermark = inputWatermarkHold;
+    } else {
+      Instant watermarkHold = keyedStateInternals.watermarkHold();
+      long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), inputWatermarkHold);
+      potentialOutputWatermark =
+          Math.min(combinedWatermarkHold, timerInternals.getMinOutputTimestampMs());
     }
+    return potentialOutputWatermark;
   }
 
-  private void emitWatermark(long watermark) {
-    // Must invoke finishBatch before emit the +Inf watermark otherwise there are some late events.
-    if (watermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
-      invokeFinishBundle();
+  private void maybeEmitWatermark(long watermark) {
+    if (watermark > currentOutputWatermark) {
+      // Must invoke finishBatch before emit the +Inf watermark otherwise there are some late
+      // events.
+      if (watermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+        invokeFinishBundle();
+      }
+      LOG.debug("Emitting watermark {}", watermark);
+      currentOutputWatermark = watermark;
+      output.emitWatermark(new Watermark(watermark));
     }
-    output.emitWatermark(new Watermark(watermark));
   }
 
   @Override
-  public void processWatermark2(Watermark mark) throws Exception {
-
-    setCurrentSideInputWatermark(mark.getTimestamp());
+  public final void processWatermark2(Watermark mark) throws Exception {
+    currentSideInputWatermark = mark.getTimestamp();

Review comment:
       Don't we have to emit pushed back data at this point (up to the side input watermark), even when it is not the final watermark?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org