You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/13 08:03:26 UTC

[GitHub] [beam] mxm commented on a change in pull request #12551: [DISCUSS] [BEAM-10691] keep track of timers creating watermark hold

mxm commented on a change in pull request #12551:
URL: https://github.com/apache/beam/pull/12551#discussion_r469769025



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1277,7 +1280,13 @@ private void onNewEventTimer(TimerData newTimer) {
           "Timer with id %s is not an event time timer!",
           newTimer.getTimerId());
       if (timerUsesOutputTimestamp(newTimer)) {
-        outputTimestampQueue.add(newTimer.getOutputTimestamp().getMillis());
+        outputTimestamps.compute(
+            newTimer.getOutputTimestamp().getMillis(),
+            (k, v) -> {
+              Set<String> timerIds = v == null ? new HashSet<>() : v;
+              timerIds.add(getContextTimerId(newTimer.getTimerId(), newTimer.getNamespace()));

Review comment:
       IMHO there is no need to store the timer id here.

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1226,7 +1230,7 @@ public TimerInternals timerInternals() {
      * fire time of the timer. Used for calculating the output watermark hold. This avoids fetching
      * timer data from the state backend which is expensive if done for each timer.
      */
-    private final PriorityQueue<Long> outputTimestampQueue;
+    private final SortedMap<Long, Set<String>> outputTimestamps = new TreeMap<>();

Review comment:
       This is very expensive in terms of memory.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org