You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/12/17 10:44:03 UTC

[GitHub] [beam] je-ik opened a new pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

je-ik opened a new pull request #13571:
URL: https://github.com/apache/beam/pull/13571


   Fixes [BEAM-11481].
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam
 .apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.a
 pache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #13571:
URL: https://github.com/apache/beam/pull/13571#discussion_r545114926



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -992,7 +997,23 @@ public void onProcessingTime(InternalTimer<ByteBuffer, TimerData> timer) {
 
   // allow overriding this in ExecutableStageDoFnOperator to set the key context
   protected void fireTimerInternal(ByteBuffer key, TimerData timerData) {
+    long oldHold = keyCoder != null ? keyedStateInternals.minWatermarkHoldMs() : -1L;
     fireTimer(timerData);
+    emitWatermarkIfHoldChanged(oldHold);
+  }
+
+  void emitWatermarkIfHoldChanged(long currentWatermarkHold) {
+    if (keyCoder != null) {
+      long newWatermarkHold = keyedStateInternals.minWatermarkHoldMs();
+      if (newWatermarkHold > currentWatermarkHold) {
+        try {
+          processInputWatermark(false);
+        } catch (Exception ex) {
+          // should not happen

Review comment:
       Isn't `IllegalStateException` meant to be used for an illegal state, which is a state that should not happen? :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #13571:
URL: https://github.com/apache/beam/pull/13571#discussion_r545810318



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -739,9 +741,12 @@ public final void processWatermark1(Watermark mark) throws Exception {
     }
 
     currentInputWatermark = mark.getTimestamp();

Review comment:
       I would like to generalize the existing code, instead of adding code paths for every exception which is bound to be error-prone.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #13571:
URL: https://github.com/apache/beam/pull/13571#discussion_r545000289



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -992,7 +997,23 @@ public void onProcessingTime(InternalTimer<ByteBuffer, TimerData> timer) {
 
   // allow overriding this in ExecutableStageDoFnOperator to set the key context
   protected void fireTimerInternal(ByteBuffer key, TimerData timerData) {
+    long oldHold = keyCoder != null ? keyedStateInternals.minWatermarkHoldMs() : -1L;
     fireTimer(timerData);
+    emitWatermarkIfHoldChanged(oldHold);
+  }
+
+  void emitWatermarkIfHoldChanged(long currentWatermarkHold) {
+    if (keyCoder != null) {
+      long newWatermarkHold = keyedStateInternals.minWatermarkHoldMs();
+      if (newWatermarkHold > currentWatermarkHold) {
+        try {
+          processInputWatermark(false);

Review comment:
       This logic should be removed. All logic which deals with watermark emission should be handled through `processWatermark`.

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -739,9 +741,12 @@ public final void processWatermark1(Watermark mark) throws Exception {
     }
 
     currentInputWatermark = mark.getTimestamp();

Review comment:
       We need the following to generalize watermark emission to be able to call this method from other places. 
   
   ```
   if (mark.getTimestamp > currentInputWatermark) {
     currentInputWatermark = mark.getTimestamp();
   }

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -739,9 +741,12 @@ public final void processWatermark1(Watermark mark) throws Exception {
     }
 
     currentInputWatermark = mark.getTimestamp();
+    processInputWatermark(true);
+  }
 
+  private void processInputWatermark(boolean advanceInputWatermark) throws Exception {

Review comment:
       Please remove if we don't want to scatter the logic about watermark advancement.

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -784,7 +789,7 @@ private long computeOutputWatermark(long inputWatermarkHold) {
     return potentialOutputWatermark;
   }
 
-  private void maybeEmitWatermark(long watermark) {
+  void maybeEmitWatermark(long watermark) {

Review comment:
       We remove the `private` modifier?

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -641,8 +641,10 @@ protected final void setBundleFinishedCallback(Runnable callback) {
   @Override
   public final void processElement(StreamRecord<WindowedValue<InputT>> streamRecord) {
     checkInvokeStartBundle();
+    long oldHold = keyCoder != null ? keyedStateInternals.minWatermarkHoldMs() : -1L;
     doFnRunner.processElement(streamRecord.getValue());
     checkInvokeFinishBundleByCount();
+    emitWatermarkIfHoldChanged(oldHold);

Review comment:
       Is this required on every element? I'd rather trigger this only if we set / remove a hold.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #13571:
URL: https://github.com/apache/beam/pull/13571#issuecomment-748095674


   @mxm I refactor the code a little, please have a look now.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #13571:
URL: https://github.com/apache/beam/pull/13571#discussion_r545114926



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -992,7 +997,23 @@ public void onProcessingTime(InternalTimer<ByteBuffer, TimerData> timer) {
 
   // allow overriding this in ExecutableStageDoFnOperator to set the key context
   protected void fireTimerInternal(ByteBuffer key, TimerData timerData) {
+    long oldHold = keyCoder != null ? keyedStateInternals.minWatermarkHoldMs() : -1L;
     fireTimer(timerData);
+    emitWatermarkIfHoldChanged(oldHold);
+  }
+
+  void emitWatermarkIfHoldChanged(long currentWatermarkHold) {
+    if (keyCoder != null) {
+      long newWatermarkHold = keyedStateInternals.minWatermarkHoldMs();
+      if (newWatermarkHold > currentWatermarkHold) {
+        try {
+          processInputWatermark(false);
+        } catch (Exception ex) {
+          // should not happen

Review comment:
       Isn't `IllegalStateException` meat to be used for an illegal state, which is a state that should not happen? :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #13571:
URL: https://github.com/apache/beam/pull/13571#discussion_r545064833



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -784,7 +789,7 @@ private long computeOutputWatermark(long inputWatermarkHold) {
     return potentialOutputWatermark;
   }
 
-  private void maybeEmitWatermark(long watermark) {
+  void maybeEmitWatermark(long watermark) {

Review comment:
       Part of experiments, can be reverted.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #13571:
URL: https://github.com/apache/beam/pull/13571#discussion_r545067052



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -739,9 +741,12 @@ public final void processWatermark1(Watermark mark) throws Exception {
     }
 
     currentInputWatermark = mark.getTimestamp();

Review comment:
       Input watermark is not what is concerned by this PR. That logic did not change, we only need to be able to progress output watermark when watermark hold changes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik edited a comment on pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
je-ik edited a comment on pull request #13571:
URL: https://github.com/apache/beam/pull/13571#issuecomment-748095674


   @mxm I refactored the code a little, please have a look now.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #13571:
URL: https://github.com/apache/beam/pull/13571#issuecomment-748108918


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #13571:
URL: https://github.com/apache/beam/pull/13571#issuecomment-748097025






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on a change in pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #13571:
URL: https://github.com/apache/beam/pull/13571#discussion_r545742844



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -992,7 +997,23 @@ public void onProcessingTime(InternalTimer<ByteBuffer, TimerData> timer) {
 
   // allow overriding this in ExecutableStageDoFnOperator to set the key context
   protected void fireTimerInternal(ByteBuffer key, TimerData timerData) {
+    long oldHold = keyCoder != null ? keyedStateInternals.minWatermarkHoldMs() : -1L;
     fireTimer(timerData);
+    emitWatermarkIfHoldChanged(oldHold);
+  }
+
+  void emitWatermarkIfHoldChanged(long currentWatermarkHold) {
+    if (keyCoder != null) {
+      long newWatermarkHold = keyedStateInternals.minWatermarkHoldMs();
+      if (newWatermarkHold > currentWatermarkHold) {
+        try {
+          processInputWatermark(false);
+        } catch (Exception ex) {
+          // should not happen

Review comment:
       I just love seeing "This should never happen" messages in logs when debugging ;)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #13571:
URL: https://github.com/apache/beam/pull/13571#discussion_r545065721



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -992,7 +997,23 @@ public void onProcessingTime(InternalTimer<ByteBuffer, TimerData> timer) {
 
   // allow overriding this in ExecutableStageDoFnOperator to set the key context
   protected void fireTimerInternal(ByteBuffer key, TimerData timerData) {
+    long oldHold = keyCoder != null ? keyedStateInternals.minWatermarkHoldMs() : -1L;
     fireTimer(timerData);
+    emitWatermarkIfHoldChanged(oldHold);
+  }
+
+  void emitWatermarkIfHoldChanged(long currentWatermarkHold) {
+    if (keyCoder != null) {
+      long newWatermarkHold = keyedStateInternals.minWatermarkHoldMs();
+      if (newWatermarkHold > currentWatermarkHold) {
+        try {
+          processInputWatermark(false);

Review comment:
       I disagree - handling watermark emission in `processWatermark` only is what causes the issues.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #13571:
URL: https://github.com/apache/beam/pull/13571#discussion_r545070247



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -739,9 +741,12 @@ public final void processWatermark1(Watermark mark) throws Exception {
     }
 
     currentInputWatermark = mark.getTimestamp();
+    processInputWatermark(true);
+  }
 
+  private void processInputWatermark(boolean advanceInputWatermark) throws Exception {

Review comment:
       I think that is what we must do. We could call `processWatermark` from `processElement`, but the call does stuff not necessary to do in `processElement`. That's why I simply wrapped what is needed to be done in `processInputWatermark` (maybe we could find a better name to make it clearer).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #13571:
URL: https://github.com/apache/beam/pull/13571#issuecomment-748208713


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik merged pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
je-ik merged pull request #13571:
URL: https://github.com/apache/beam/pull/13571


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #13571:
URL: https://github.com/apache/beam/pull/13571#discussion_r545067861



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -641,8 +641,10 @@ protected final void setBundleFinishedCallback(Runnable callback) {
   @Override
   public final void processElement(StreamRecord<WindowedValue<InputT>> streamRecord) {
     checkInvokeStartBundle();
+    long oldHold = keyCoder != null ? keyedStateInternals.minWatermarkHoldMs() : -1L;
     doFnRunner.processElement(streamRecord.getValue());
     checkInvokeFinishBundleByCount();
+    emitWatermarkIfHoldChanged(oldHold);

Review comment:
       This is very cheap call and actually tests exactly if hold was set (actually reset).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on a change in pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #13571:
URL: https://github.com/apache/beam/pull/13571#discussion_r544993985



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -992,7 +997,23 @@ public void onProcessingTime(InternalTimer<ByteBuffer, TimerData> timer) {
 
   // allow overriding this in ExecutableStageDoFnOperator to set the key context
   protected void fireTimerInternal(ByteBuffer key, TimerData timerData) {
+    long oldHold = keyCoder != null ? keyedStateInternals.minWatermarkHoldMs() : -1L;
     fireTimer(timerData);
+    emitWatermarkIfHoldChanged(oldHold);
+  }
+
+  void emitWatermarkIfHoldChanged(long currentWatermarkHold) {
+    if (keyCoder != null) {
+      long newWatermarkHold = keyedStateInternals.minWatermarkHoldMs();
+      if (newWatermarkHold > currentWatermarkHold) {
+        try {
+          processInputWatermark(false);
+        } catch (Exception ex) {
+          // should not happen

Review comment:
       can you please add this as a message for IllegalStateException?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
dmvk commented on pull request #13571:
URL: https://github.com/apache/beam/pull/13571#issuecomment-747362283


   Can you please add a new DoFnOperatorTest for this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #13571:
URL: https://github.com/apache/beam/pull/13571#issuecomment-748108527


   Run Python_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #13571:
URL: https://github.com/apache/beam/pull/13571#issuecomment-747507188


   @dmvk test added, @mxm did I answer the questions?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #13571:
URL: https://github.com/apache/beam/pull/13571#issuecomment-748184560


   Feel free to proceed however you like. No need to block this on me. After all, it's a good fix!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #13571:
URL: https://github.com/apache/beam/pull/13571#issuecomment-747540982


   Run Flink ValidatesRunner


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
dmvk commented on pull request #13571:
URL: https://github.com/apache/beam/pull/13571#issuecomment-747368390


   My best guess is that impulse progresses input WM from MIN to MAX and we let watermark hold to handle the progression... in that case, SDF probably never worked properly on flink runner 🤔 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #13571:
URL: https://github.com/apache/beam/pull/13571#issuecomment-747543133


   Run Java Flink PortableValidatesRunner Streaming


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #13571: [BEAM-11481] emit output watermark on watermark hold change

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #13571:
URL: https://github.com/apache/beam/pull/13571#discussion_r545808663



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -739,9 +741,12 @@ public final void processWatermark1(Watermark mark) throws Exception {
     }
 
     currentInputWatermark = mark.getTimestamp();

Review comment:
       You would need this change in order to go through the regular watermark emission code without changing the latest seen input watermark.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org