You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/20 22:16:02 UTC
[09/50] [abbrv] incubator-beam git commit: Add test for
ReduceFnRunner GC time overflow
Add test for ReduceFnRunner GC time overflow
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5bf732cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5bf732cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5bf732cd
Branch: refs/heads/python-sdk
Commit: 5bf732cd3e598321a5c51e1239eda0fe2877a65d
Parents: 6058330
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jun 14 16:04:10 2016 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Jun 20 15:14:29 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/WindowMatchers.java | 5 ++
.../beam/sdk/util/ReduceFnRunnerTest.java | 68 ++++++++++++++++++++
2 files changed, 73 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5bf732cd/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
index b47c32c..7a5e2fb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
@@ -47,6 +47,11 @@ public class WindowMatchers {
return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, Matchers.anything());
}
+ public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
+ Matcher<? super T> valueMatcher) {
+ return new WindowedValueMatcher<>(valueMatcher, Matchers.anything(), Matchers.anything());
+ }
+
public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
T value, long timestamp, long windowStart, long windowEnd) {
return WindowMatchers.<T>isSingleWindowedValue(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5bf732cd/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
index 0df4bc6..b7ec540 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.util;
import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue;
+import static org.apache.beam.sdk.WindowMatchers.isWindowedValue;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -51,6 +52,7 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Never;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
@@ -59,6 +61,7 @@ import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
@@ -79,6 +82,7 @@ import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -226,6 +230,70 @@ public class ReduceFnRunnerTest {
tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
}
+ /**
+ * Tests that the garbage collection time for a fixed window does not overflow the end of time.
+ */
+ @Test
+ public void testFixedWindowEndOfTimeGarbageCollection() throws Exception {
+
+ Duration allowedLateness = Duration.standardDays(365);
+ Duration windowSize = Duration.millis(10);
+ WindowFn<Object, IntervalWindow> windowFn = FixedWindows.of(windowSize);
+
+ // This timestamp falls into a window where the end of the window is before the end of the
+ // global window - the "end of time" - yet its expiration time is after.
+ final Instant elementTimestamp =
+ GlobalWindow.INSTANCE.maxTimestamp().minus(allowedLateness).plus(1);
+
+ IntervalWindow window = Iterables.getOnlyElement(
+ windowFn.assignWindows(
+ windowFn.new AssignContext() {
+ @Override
+ public Object element() {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public Instant timestamp() {
+ return elementTimestamp;
+ }
+
+ @Override
+ public Collection<? extends BoundedWindow> windows() {
+ throw new UnsupportedOperationException();
+ }
+ }));
+
+ assertTrue(
+ window.maxTimestamp().isBefore(GlobalWindow.INSTANCE.maxTimestamp()));
+ assertTrue(
+ window.maxTimestamp().plus(allowedLateness).isAfter(GlobalWindow.INSTANCE.maxTimestamp()));
+
+ // Test basic execution of a trigger using a non-combining window set and accumulating mode.
+ ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+ ReduceFnTester.combining(
+ windowFn,
+ AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever()).buildTrigger(),
+ AccumulationMode.DISCARDING_FIRED_PANES,
+ new Sum.SumIntegerFn().<String>asKeyedFn(),
+ VarIntCoder.of(),
+ allowedLateness);
+
+ tester.injectElements(TimestampedValue.of(13, elementTimestamp));
+
+ // Should fire ON_TIME pane and there will be a checkState that the cleanup time
+ // is prior to timestamp max value
+ tester.advanceInputWatermark(window.maxTimestamp());
+
+ // Nothing in the ON_TIME pane (not governed by triggers, but by ReduceFnRunner)
+ assertThat(tester.extractOutput(), emptyIterable());
+
+ tester.injectElements(TimestampedValue.of(42, elementTimestamp));
+
+ // Now the final pane should fire, demonstrating that the GC time was truncated
+ tester.advanceInputWatermark(GlobalWindow.INSTANCE.maxTimestamp());
+ assertThat(tester.extractOutput(), contains(isWindowedValue(equalTo(55))));
+ }
+
@Test
public void testOnElementCombiningAccumulating() throws Exception {
// Test basic execution of a trigger using a non-combining window set and accumulating mode.