You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/12 18:24:45 UTC

[GitHub] [beam] je-ik opened a new pull request #12551: [DISCUSS] {BEAM-10691] keep track of timers creating watermark hold

je-ik opened a new pull request #12551:
URL: https://github.com/apache/beam/pull/12551


   This seems to fix the issue in my pipeline. Not sure if this is the best solution, yet. This PR is for discussion.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/b
 eam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   ![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm edited a comment on pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
mxm edited a comment on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-673478937


   I'm not following. How's adding a unnecessarily restrictive assertion a proof that there is a bug? At any point in time there can be multiple timers which set an output timestamp hold on the same timestamp. Please keep in mind that multiple keys can set set a single timer with a different output timestamp. That does not indicate a bug. 
   
   The priority queue is a global view over all keys. The watermark will not be advanced past`t` if for any key there exists a timer with the output timestamp `t`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik edited a comment on pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
je-ik edited a comment on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-673488800


   Agree. Multiple different timers can indeed set the same output timestamps (different in the sense that it is either completely different timer, or different key). But - the assertion checks, that *there is not already set timer with same name amd namespace*. That is - same timer for dame key. That should be impossible, because the timer should be:
    - either completely new (and so there cannot be any record for it), or
    - updated, but then it should be cancelled first (and set back again as new).
   
   The fact that the precondtion fails demonstrates why this patch solves an issue - in the original code, the output timestamp for the precisely same timer would have been inserted twice in the priority queue, but later removed only once (when updated or fired), thus leaving one output timestamp in priority queue for even and block progress of pipeline for ever.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-673334927


   > Hey @je-ik! Are you sure that problem exists in the latest version? This feature has been tested very extensively in production. Unless you can point out an error in the implementation, I'd rather not change anything.
   100% sure. I'm using current snapshot. This patch fixes issue I'm seeing in the production pipeline, where the pipeline stops executing in minutes after being restarted. It might be some special conditions of the pipeline, but the issue is real for sure.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #12551:
URL: https://github.com/apache/beam/pull/12551#discussion_r470650517



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1226,7 +1227,7 @@ public TimerInternals timerInternals() {
      * fire time of the timer. Used for calculating the output watermark hold. This avoids fetching
      * timer data from the state backend which is expensive if done for each timer.
      */
-    private final PriorityQueue<Long> outputTimestampQueue;
+    private final TreeMultiset<Long> outputTimestamps = TreeMultiset.create();

Review comment:
       Wasn't familiar with the internals of TreeMultiSet but if that's the case, it sounds excellent! :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #12551:
URL: https://github.com/apache/beam/pull/12551#discussion_r470638431



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1226,7 +1227,7 @@ public TimerInternals timerInternals() {
      * fire time of the timer. Used for calculating the output watermark hold. This avoids fetching
      * timer data from the state backend which is expensive if done for each timer.
      */
-    private final PriorityQueue<Long> outputTimestampQueue;
+    private final TreeMultiset<Long> outputTimestamps = TreeMultiset.create();

Review comment:
       Pushed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-673478937


   I'm not following. How's adding a unnecessarily restrictive assertion a proof that there is a bug? At any point in time there can be multiple timers who set an output timestamp hold on the same timestamp. Please keep in mind that multiple keys can set set a single timer with a different output timestamp. That does not indicate a bug. 
   
   The priority queue is a global view over all keys. The watermark will not be advanced past`t` if for any key there exists a timer with the output timestamp `t`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #12551:
URL: https://github.com/apache/beam/pull/12551#discussion_r470627537



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1226,7 +1227,7 @@ public TimerInternals timerInternals() {
      * fire time of the timer. Used for calculating the output watermark hold. This avoids fetching
      * timer data from the state backend which is expensive if done for each timer.
      */
-    private final PriorityQueue<Long> outputTimestampQueue;
+    private final TreeMultiset<Long> outputTimestamps = TreeMultiset.create();

Review comment:
       I don't know the internal details of TreeMultiset, but I'd suppose it to be pretty much the same as Map<Long, Integer> in terms of performance. Removing the outputTimestamps entirely seems to be the best option. I'll try that and let you know.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-673488800


   Agree. Multiple different timers can indeed set the same output timestamps (different in the sense that it is either completely different timer, or different key). But - the assertion checks, that *there is not already set timer with same name amd namespace*. That is - same timer for dame key. That should be impossible, because the timer should be:
    - either completely new (and so there cannot be any record for it), or
    - updated, but then it should be cancelled first (and set back again as new).
   The fact that the precondtion fails demonstrates why this patch solves an issue - in the original code, the output timestamp for the precisely same timer would have been inserted twice in the priority queue, but later removed only once (when updated or fired), thus leaving one output timestamp in priority queue for even and block progress of pipeline for ever.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #12551:
URL: https://github.com/apache/beam/pull/12551#discussion_r470629668



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1226,7 +1227,7 @@ public TimerInternals timerInternals() {
      * fire time of the timer. Used for calculating the output watermark hold. This avoids fetching
      * timer data from the state backend which is expensive if done for each timer.
      */
-    private final PriorityQueue<Long> outputTimestampQueue;
+    private final TreeMultiset<Long> outputTimestamps = TreeMultiset.create();

Review comment:
       Maybe we could replace the TreeMap<Long, Integer> with the TreeMultiset?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #12551:
URL: https://github.com/apache/beam/pull/12551#discussion_r470633121



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1226,7 +1227,7 @@ public TimerInternals timerInternals() {
      * fire time of the timer. Used for calculating the output watermark hold. This avoids fetching
      * timer data from the state backend which is expensive if done for each timer.
      */
-    private final PriorityQueue<Long> outputTimestampQueue;
+    private final TreeMultiset<Long> outputTimestamps = TreeMultiset.create();

Review comment:
       Why would we do this? TreeMap<Long, Integer> seems like the better data structure because it stores the data compacted.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-673348164


   As I said, this patch fixes the pipeline without any change in the user code. The fire timestamp is shifted by fixed amount of several tens of seconds. I don't think there is any issue in how this feature is used.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-673427301


   @mxm I added this check: https://github.com/apache/beam/pull/12551/commits/8ea092afefe8e3602d73c591e92a7ff5250684f5
   and it resulted in the following trace:
   ```
   java.lang.RuntimeException: Failed to set timer
   	at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.setTimer(DoFnOperator.java:1398)
   	at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.setTimer(DoFnOperator.java:1373)
   	at org.apache.beam.runners.core.StatefulDoFnRunner.setupFlushTimer(StatefulDoFnRunner.java:288)
   	at org.apache.beam.runners.core.StatefulDoFnRunner.processElementOrdered(StatefulDoFnRunner.java:180)
   	at org.apache.beam.runners.core.StatefulDoFnRunner.processElement(StatefulDoFnRunner.java:147)
   	at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
   	at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:604)
   	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
   	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
   	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
   	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
   	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
   	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
   	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
   	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
   	at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.IllegalStateException
   	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:491)
   	at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.lambda$onNewEventTimer$0(DoFnOperator.java:1287)
   	at java.util.Map.compute(Map.java:1093)
   	at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.onNewEventTimer(DoFnOperator.java:1283)
   	at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.registerTimer(DoFnOperator.java:1408)
   	at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.setTimer(DoFnOperator.java:1396)
   	... 17 more
   ```
   That was to proove that there really is already set output timestamp of one specific timer, that is added multiple times. From the code I still do not see how this can happen. Yes, the timer is set from StatefulDoFnRunner (due to @RequiresTimeSortedInput), but provided that the processing is singlethreaded, I'm still a little lost. And it seems that the same can happen to ordinary user-code timers.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik merged pull request #12551: [BEAM-10691] Use FlinkStateInternals#addWatermarkHoldUsage for timer output timestamp

Posted by GitBox <gi...@apache.org>.
je-ik merged pull request #12551:
URL: https://github.com/apache/beam/pull/12551


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-673543606


   That would be the source of misunderstanding. I thought namespace contained the key as well. Will look into that.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-673985067


   Sure, no problem! Could you share some more context on the pipeline your running and which SDK you're using?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #12551: [DISCUSS] {BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-673037113


   Hi @mxm, can you please help me elaborate on this? This PR fixes issue I notices, but I'm not sure, why there seems to be timers that are not cleared from the original queue. Moreover, I'm not sure, why there can be null value in the new map (see FIXME in the code). With the precondition check some validatesRunner tests fail.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #12551: [BEAM-10691] Use FlinkStateInternals#addWatermarkHoldUsage for timer output timestamp

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-674098703


   Run Flink ValidatesRunner


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm edited a comment on pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
mxm edited a comment on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-673621651


   Yes, that timer map utilizes Flink's keyed state backend which always takes the currently active key into account. The map is necessary for allowing to cancel the currently active timer if we set a new timer for that key. Flink doesn't support that out of the box without you knowing the previous timestamp the timer was set to.
   
   I added that note because somebody tried to use this map for calculating the hold which does not work. You can iterate over all keys (we do that during restore) but that is a very expensive operation you don't want to perform per-element.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #12551:
URL: https://github.com/apache/beam/pull/12551#discussion_r469769025



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1277,7 +1280,13 @@ private void onNewEventTimer(TimerData newTimer) {
           "Timer with id %s is not an event time timer!",
           newTimer.getTimerId());
       if (timerUsesOutputTimestamp(newTimer)) {
-        outputTimestampQueue.add(newTimer.getOutputTimestamp().getMillis());
+        outputTimestamps.compute(
+            newTimer.getOutputTimestamp().getMillis(),
+            (k, v) -> {
+              Set<String> timerIds = v == null ? new HashSet<>() : v;
+              timerIds.add(getContextTimerId(newTimer.getTimerId(), newTimer.getNamespace()));

Review comment:
       IMHO there is no need to store the timer id here.

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1226,7 +1230,7 @@ public TimerInternals timerInternals() {
      * fire time of the timer. Used for calculating the output watermark hold. This avoids fetching
      * timer data from the state backend which is expensive if done for each timer.
      */
-    private final PriorityQueue<Long> outputTimestampQueue;
+    private final SortedMap<Long, Set<String>> outputTimestamps = new TreeMap<>();

Review comment:
       This is very expensive in terms of memory.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-674000820


   Java SDK, the only special thing about the pipeline is that it uses the mentioned @RequiresTimeSortedInput. The stateful dofn then actually doesn't use any timers itself, only timers wrapped in StatefulDoFnRunner. But there seems to be no actual difference between how timers are set in user code an in StatefulDoFnRunner.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-674064055


   Run Flink ValidatesRunner


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-673564429


   @mxm How is it possible that `getContextTimerId` (used in the precondition check and in the logic around that) does not contain key, if `String contextTimerId = getContextTimerId(timer.getTimerId(), timer.getNamespace())` is used for registering and cancellation of timers? There seems to be an issue with missing timer family id, but that is not related.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #12551:
URL: https://github.com/apache/beam/pull/12551#discussion_r469777963



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1277,7 +1280,13 @@ private void onNewEventTimer(TimerData newTimer) {
           "Timer with id %s is not an event time timer!",
           newTimer.getTimerId());
       if (timerUsesOutputTimestamp(newTimer)) {
-        outputTimestampQueue.add(newTimer.getOutputTimestamp().getMillis());
+        outputTimestamps.compute(
+            newTimer.getOutputTimestamp().getMillis(),
+            (k, v) -> {
+              Set<String> timerIds = v == null ? new HashSet<>() : v;
+              timerIds.add(getContextTimerId(newTimer.getTimerId(), newTimer.getNamespace()));

Review comment:
       Duplicates are ok. If we have multiple timers set with different ids or even for the same keys, duplicates are expected. We could use a TreeMap instead of a PriorityQueue like we do in FlinkTimerInterals for the WatermarkHoldState, but I don't see anything wrong with the implementation here. 
   
   You likely have a problem in your application code, i.e. you set a timer output timestamp which holds back the watermark but your fire timestamp has not been reached by the watermark yet. Please investigate that first. Also note #12531 which was a bug in the Python SDK.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-673621651


   Yes, that timer map utilizes Flink's keyed state backend which always takes the currently active key into account. The map is necessary for allowing to cancel the currently active timer if we set a new timer for that key. Flink doesn't support that out of the box without you knowing the previous timestamp the timer was set to.
   
   I added that note because somebody tried to use this map for calculating the hold which does not work. You can iterate over all keys (we do that during restore) but that is a very expensive operation you don't want to perform per-key.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-673662843


   :+1: understood. I'll fix the PR so that it takes the key into account and test that tomorrow. Thanks for the help!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik edited a comment on pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
je-ik edited a comment on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-673334927


   > Hey @je-ik! Are you sure that problem exists in the latest version? This feature has been tested very extensively in production. Unless you can point out an error in the implementation, I'd rather not change anything.
   
   100% sure. I'm using current snapshot. This patch fixes issue I'm seeing in the production pipeline, where the pipeline stops executing in minutes after being restarted. It might be some special conditions of the pipeline, but the issue is real for sure.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-673336167


   It's likely caused by using the output timestamp feature in the wrong way. If you use a fire timestamp too far in the future, it can easily stall the entire pipeline. It's a pretty dangerous feature which should be avoided if possible.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-673497249


   Timers are set per key, not per partition. The same timer in the same namespace can be set multiple times for different keys. The assertion doesn't check the key.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-674059727


   @mxm I think I figured it out. The current commit https://github.com/apache/beam/pull/12551/commits/fe75b7e01d55a0c24486b032330d26d66b57f9c2 works for my pipeline. I think the explanation is that the pipeline was not really completely stuck, only so slow, it looked like that. PriorityQueue#remove(Object) has O(N) complexity and when there is really many, many, many timers (with nearly the same output timestamp), it would cause the Pipeline to stop running.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm closed pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
mxm closed pull request #12551:
URL: https://github.com/apache/beam/pull/12551


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #12551:
URL: https://github.com/apache/beam/pull/12551#discussion_r470623744



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1226,7 +1227,7 @@ public TimerInternals timerInternals() {
      * fire time of the timer. Used for calculating the output watermark hold. This avoids fetching
      * timer data from the state backend which is expensive if done for each timer.
      */
-    private final PriorityQueue<Long> outputTimestampQueue;
+    private final TreeMultiset<Long> outputTimestamps = TreeMultiset.create();

Review comment:
       We could also consider making this even more efficient by using a `TreeMap<Long, Integer>` where the key is the output timestamp and the value the number of timers which have set it, similar to how it's done in `FlinkStateInternals` for the watermark holds.
   
   Further, we could remove `outputTimestamps` entirely and simply use `stateInternals.addWatermarkHoldUsage(output_timestamp)` and `stateInternals.removeWatermarkHoldUsage(output_timestamp)`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-674063978


   Run Portable_Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-673985239


   (sorry, wrong button)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-673351917


   Another observation is that this happens when there are many timers that are fired at the same time. The pipeline operates in bootstrap mode - watermark is updated after reading 1 hour of data from batch storage, so it "hops" on 1 hour boundaries, but the pipeline uses 30s windows. So there are many windows that get closed and that's where this happens. It seems no to happen when the pipeline reads realtime data and updates watermark appropriately. I think there might be some sort of race condition somewhere, but so far I didn't figure out where exactly.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #12551:
URL: https://github.com/apache/beam/pull/12551#discussion_r469774225



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1226,7 +1230,7 @@ public TimerInternals timerInternals() {
      * fire time of the timer. Used for calculating the output watermark hold. This avoids fetching
      * timer data from the state backend which is expensive if done for each timer.
      */
-    private final PriorityQueue<Long> outputTimestampQueue;
+    private final SortedMap<Long, Set<String>> outputTimestamps = new TreeMap<>();

Review comment:
       Agree, that's why I don't generally like this solution and would like to come up with different one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #12551:
URL: https://github.com/apache/beam/pull/12551#discussion_r469774735



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1277,7 +1280,13 @@ private void onNewEventTimer(TimerData newTimer) {
           "Timer with id %s is not an event time timer!",
           newTimer.getTimerId());
       if (timerUsesOutputTimestamp(newTimer)) {
-        outputTimestampQueue.add(newTimer.getOutputTimestamp().getMillis());
+        outputTimestamps.compute(
+            newTimer.getOutputTimestamp().getMillis(),
+            (k, v) -> {
+              Set<String> timerIds = v == null ? new HashSet<>() : v;
+              timerIds.add(getContextTimerId(newTimer.getTimerId(), newTimer.getNamespace()));

Review comment:
       It should not. But in some cases, a timer output timestamp seems to appear in the queue _twice_ (for the same timer) and removed only _once_, which causes the pipeline to get stuck.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik edited a comment on pull request #12551: [DISCUSS] {BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
je-ik edited a comment on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-673037113


   Hi @mxm, can you please help me elaborate on this? This PR fixes issue I noticed, but I'm not sure, why there seems to be timers that are not cleared from the original queue. Moreover, I'm not sure, why there can be null value in the new map (see FIXME in the code). With the precondition check some validatesRunner tests fail.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-673567201


   Found that.
   ```
        * <p>CAUTION: This map is scoped by the current active key. Do not attempt to perform any
        * calculations which span across keys.
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #12551: [BEAM-10691] Use FlinkStateInternals#addWatermarkHoldUsage for timer output timestamp

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-674098964


   Thanks. I squashed the commits and will merge this when the checks pass. Thanks for helping solve this!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #12551:
URL: https://github.com/apache/beam/pull/12551#discussion_r470636567



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1226,7 +1227,7 @@ public TimerInternals timerInternals() {
      * fire time of the timer. Used for calculating the output watermark hold. This avoids fetching
      * timer data from the state backend which is expensive if done for each timer.
      */
-    private final PriorityQueue<Long> outputTimestampQueue;
+    private final TreeMultiset<Long> outputTimestamps = TreeMultiset.create();

Review comment:
       TreeMultiset does the same. And saves some code and performance, the value is stored as array of length 1 and thus adding an already existing item to the set is just get() and in-place increment, while the TreeMap must do get & replace. I think the biggest argument here is that the code is more readable, though. I'll update the PR in a sec, so that you can see.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #12551: [DISCUSS] {BEAM-10691] keep track of timers creating watermark hold

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #12551:
URL: https://github.com/apache/beam/pull/12551#issuecomment-673038647


   Run Flink ValidatesRunner


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org