You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/12 16:16:35 UTC

[GitHub] [beam] kennknowles commented on a change in pull request #13032: [BEAM-11034] Avoid build-up of stateful garbage collection timers for…

kennknowles commented on a change in pull request #13032:
URL: https://github.com/apache/beam/pull/13032#discussion_r503400133



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
##########
@@ -486,14 +487,23 @@ private void processTimers(
     for (W window : windowsToCleanup) {
       // The stepContext is the thing that know if it is batch or streaming, hence
       // whether state needs to be cleaned up or will simply be discarded so the
-      // timer can be ignored
-
+      // timer can be ignored.
       Instant cleanupTime = earliestAllowableCleanupTime(window, windowingStrategy);
-      // if DoFn has OnWindowExpiration then set holds for system timer.
-      Instant cleanupOutputTimestamp =
-          fnSignature.onWindowExpiration() == null ? cleanupTime : cleanupTime.minus(1L);
-      stepContext.setStateCleanupTimer(
-          CLEANUP_TIMER_ID, window, windowCoder, cleanupTime, cleanupOutputTimestamp);
+      // Set a cleanup timer for state at the end of the window to trigger onWindowExpiration and
+      // garbage collect state. We avoid doing this for the global window if there is no window
+      // expiration set as the state will be up when the pipeline terminates. Setting the timer
+      // leads to a unbounded growth of timers for pipelines with many unique keys in the global
+      // window.
+      if (cleanupTime.isBefore(GlobalWindow.INSTANCE.maxTimestamp())

Review comment:
       Re-reading the bug and thinking about your comments, can we eliminate this condition and invoke from some `onDrain` method?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org