You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/16 18:05:37 UTC
[1/3] incubator-beam git commit: Add test for ReduceFnRunner GC time
overflow
Repository: incubator-beam
Updated Branches:
refs/heads/master 0a3425e77 -> 455aaed55
Add test for ReduceFnRunner GC time overflow
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/925264e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/925264e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/925264e3
Branch: refs/heads/master
Commit: 925264e3eb244172e57c134cb3bf89db3d98258c
Parents: f8c59bd
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jun 14 16:04:10 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 16 11:04:22 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/WindowMatchers.java | 5 ++
.../beam/sdk/util/ReduceFnRunnerTest.java | 68 ++++++++++++++++++++
2 files changed, 73 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/925264e3/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
index b47c32c..7a5e2fb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
@@ -47,6 +47,11 @@ public class WindowMatchers {
return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, Matchers.anything());
}
+ public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
+ Matcher<? super T> valueMatcher) {
+ return new WindowedValueMatcher<>(valueMatcher, Matchers.anything(), Matchers.anything());
+ }
+
public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
T value, long timestamp, long windowStart, long windowEnd) {
return WindowMatchers.<T>isSingleWindowedValue(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/925264e3/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
index 0df4bc6..b7ec540 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.util;
import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue;
+import static org.apache.beam.sdk.WindowMatchers.isWindowedValue;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -51,6 +52,7 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Never;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
@@ -59,6 +61,7 @@ import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
@@ -79,6 +82,7 @@ import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -226,6 +230,70 @@ public class ReduceFnRunnerTest {
tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
}
+ /**
+ * Tests that the garbage collection time for a fixed window does not overflow the end of time.
+ */
+ @Test
+ public void testFixedWindowEndOfTimeGarbageCollection() throws Exception {
+
+ Duration allowedLateness = Duration.standardDays(365);
+ Duration windowSize = Duration.millis(10);
+ WindowFn<Object, IntervalWindow> windowFn = FixedWindows.of(windowSize);
+
+ // This timestamp falls into a window where the end of the window is before the end of the
+ // global window - the "end of time" - yet its expiration time is after.
+ final Instant elementTimestamp =
+ GlobalWindow.INSTANCE.maxTimestamp().minus(allowedLateness).plus(1);
+
+ IntervalWindow window = Iterables.getOnlyElement(
+ windowFn.assignWindows(
+ windowFn.new AssignContext() {
+ @Override
+ public Object element() {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public Instant timestamp() {
+ return elementTimestamp;
+ }
+
+ @Override
+ public Collection<? extends BoundedWindow> windows() {
+ throw new UnsupportedOperationException();
+ }
+ }));
+
+ assertTrue(
+ window.maxTimestamp().isBefore(GlobalWindow.INSTANCE.maxTimestamp()));
+ assertTrue(
+ window.maxTimestamp().plus(allowedLateness).isAfter(GlobalWindow.INSTANCE.maxTimestamp()));
+
+ // Test basic execution of a trigger using a non-combining window set and accumulating mode.
+ ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+ ReduceFnTester.combining(
+ windowFn,
+ AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever()).buildTrigger(),
+ AccumulationMode.DISCARDING_FIRED_PANES,
+ new Sum.SumIntegerFn().<String>asKeyedFn(),
+ VarIntCoder.of(),
+ allowedLateness);
+
+ tester.injectElements(TimestampedValue.of(13, elementTimestamp));
+
+ // Should fire ON_TIME pane and there will be a checkState that the cleanup time
+ // is prior to timestamp max value
+ tester.advanceInputWatermark(window.maxTimestamp());
+
+ // Nothing in the ON_TIME pane (not governed by triggers, but by ReduceFnRunner)
+ assertThat(tester.extractOutput(), emptyIterable());
+
+ tester.injectElements(TimestampedValue.of(42, elementTimestamp));
+
+ // Now the final pane should fire, demonstrating that the GC time was truncated
+ tester.advanceInputWatermark(GlobalWindow.INSTANCE.maxTimestamp());
+ assertThat(tester.extractOutput(), contains(isWindowedValue(equalTo(55))));
+ }
+
@Test
public void testOnElementCombiningAccumulating() throws Exception {
// Test basic execution of a trigger using a non-combining window set and accumulating mode.
[3/3] incubator-beam git commit: This closes #464
Posted by ke...@apache.org.
This closes #464
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/455aaed5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/455aaed5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/455aaed5
Branch: refs/heads/master
Commit: 455aaed5557da4a07dd86d94a1a4fe709455d6aa
Parents: 0a3425e dc60dc7
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 16 11:04:57 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 16 11:04:57 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/util/ReduceFnRunner.java | 19 ++++--
.../org/apache/beam/sdk/WindowMatchers.java | 5 ++
.../beam/sdk/util/ReduceFnRunnerTest.java | 68 ++++++++++++++++++++
3 files changed, 85 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-beam git commit: Fix overflow in ReduceFnRunner
garbage collection times
Posted by ke...@apache.org.
Fix overflow in ReduceFnRunner garbage collection times
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dc60dc7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dc60dc7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dc60dc7c
Branch: refs/heads/master
Commit: dc60dc7ce22f95e12c99bb65f258931f330444c9
Parents: 925264e
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jun 14 16:10:09 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 16 11:04:23 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/util/ReduceFnRunner.java | 19 ++++++++++++-------
1 file changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dc60dc7c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
index 34208da..864e8e7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
@@ -936,16 +936,21 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
}
/**
- * Return when {@code window} should be garbage collected. If the window is the GlobalWindow,
- * that will be the end of the window. Otherwise, add the allowed lateness to the end of
- * the window.
+ * Return when {@code window} should be garbage collected. If the window's expiration time is on
+ * or after the end of the global window, it will be truncated to the end of the global window.
*/
private Instant garbageCollectionTime(W window) {
- Instant maxTimestamp = window.maxTimestamp();
- if (maxTimestamp.isBefore(GlobalWindow.INSTANCE.maxTimestamp())) {
- return maxTimestamp.plus(windowingStrategy.getAllowedLateness());
+
+ // If the end of the window + allowed lateness is beyond the "end of time" aka the end of the
+ // global window, then we truncate it. The conditional is phrased like it is because the
+ // addition of EOW + allowed lateness might even overflow the maximum allowed Instant
+ if (GlobalWindow.INSTANCE
+ .maxTimestamp()
+ .minus(windowingStrategy.getAllowedLateness())
+ .isBefore(window.maxTimestamp())) {
+ return GlobalWindow.INSTANCE.maxTimestamp();
} else {
- return maxTimestamp;
+ return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
}
}