You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/20 22:16:26 UTC

[33/50] [abbrv] incubator-beam git commit: Fix overflow in ReduceFnRunner garbage collection times

Fix overflow in ReduceFnRunner garbage collection times


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4f7a2ab4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4f7a2ab4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4f7a2ab4

Branch: refs/heads/python-sdk
Commit: 4f7a2ab47c5fdd9b3de5f091a40128e68ddd11a3
Parents: 5bf732c
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jun 14 16:10:09 2016 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Jun 20 15:14:30 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/util/ReduceFnRunner.java | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f7a2ab4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
index 34208da..864e8e7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
@@ -936,16 +936,21 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
   }
 
   /**
-   * Return when {@code window} should be garbage collected. If the window is the GlobalWindow,
-   * that will be the end of the window. Otherwise, add the allowed lateness to the end of
-   * the window.
+   * Return when {@code window} should be garbage collected. If the window's expiration time is on
+   * or after the end of the global window, it will be truncated to the end of the global window.
    */
   private Instant garbageCollectionTime(W window) {
-    Instant maxTimestamp = window.maxTimestamp();
-    if (maxTimestamp.isBefore(GlobalWindow.INSTANCE.maxTimestamp())) {
-      return maxTimestamp.plus(windowingStrategy.getAllowedLateness());
+
+    // If the end of the window + allowed lateness is beyond the "end of time" aka the end of the
+    // global window, then we truncate it. The conditional is phrased like it is because the
+    // addition of EOW + allowed lateness might even overflow the maximum allowed Instant
+    if (GlobalWindow.INSTANCE
+        .maxTimestamp()
+        .minus(windowingStrategy.getAllowedLateness())
+        .isBefore(window.maxTimestamp())) {
+      return GlobalWindow.INSTANCE.maxTimestamp();
     } else {
-      return maxTimestamp;
+      return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
     }
   }