You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/20 22:16:26 UTC
[33/50] [abbrv] incubator-beam git commit: Fix overflow in
ReduceFnRunner garbage collection times
Fix overflow in ReduceFnRunner garbage collection times
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4f7a2ab4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4f7a2ab4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4f7a2ab4
Branch: refs/heads/python-sdk
Commit: 4f7a2ab47c5fdd9b3de5f091a40128e68ddd11a3
Parents: 5bf732c
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jun 14 16:10:09 2016 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Jun 20 15:14:30 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/util/ReduceFnRunner.java | 19 ++++++++++++-------
1 file changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f7a2ab4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
index 34208da..864e8e7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
@@ -936,16 +936,21 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
}
/**
- * Return when {@code window} should be garbage collected. If the window is the GlobalWindow,
- * that will be the end of the window. Otherwise, add the allowed lateness to the end of
- * the window.
+ * Return when {@code window} should be garbage collected. If the window's expiration time is on
+ * or after the end of the global window, it will be truncated to the end of the global window.
*/
private Instant garbageCollectionTime(W window) {
- Instant maxTimestamp = window.maxTimestamp();
- if (maxTimestamp.isBefore(GlobalWindow.INSTANCE.maxTimestamp())) {
- return maxTimestamp.plus(windowingStrategy.getAllowedLateness());
+
+ // If the end of the window + allowed lateness is beyond the "end of time" aka the end of the
+ // global window, then we truncate it. The conditional is phrased like it is because the
+ // addition of EOW + allowed lateness might even overflow the maximum allowed Instant
+ if (GlobalWindow.INSTANCE
+ .maxTimestamp()
+ .minus(windowingStrategy.getAllowedLateness())
+ .isBefore(window.maxTimestamp())) {
+ return GlobalWindow.INSTANCE.maxTimestamp();
} else {
- return maxTimestamp;
+ return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
}
}