You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/13 22:22:48 UTC
[14/17] incubator-beam git commit: Restore prior trigger files,
for temporary compatibility
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java
new file mode 100644
index 0000000..7e6e938
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests the {@link AfterSynchronizedProcessingTime}.
+ */
+@RunWith(JUnit4.class)
+public class AfterSynchronizedProcessingTimeTest {
+
+ private Trigger underTest = new AfterSynchronizedProcessingTime();
+
+ @Test
+ public void testAfterProcessingTimeWithFixedWindows() throws Exception {
+ Duration windowDuration = Duration.millis(10);
+ SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
+ AfterProcessingTime
+ .pastFirstElementInPane()
+ .plusDelayOf(Duration.millis(5)),
+ FixedWindows.of(windowDuration));
+
+ tester.advanceProcessingTime(new Instant(10));
+
+ // Timer at 15
+ tester.injectElements(1);
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(10));
+ tester.advanceProcessingTime(new Instant(12));
+ assertFalse(tester.shouldFire(firstWindow));
+
+ // Load up elements in the next window, timer at 17 for them
+ tester.injectElements(11, 12, 13);
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(20));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ // Not quite time to fire
+ tester.advanceProcessingTime(new Instant(14));
+ assertFalse(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ // Timer at 19 for these in the first window; it should be ignored since the 15 will fire first
+ tester.injectElements(2, 3);
+
+ // Advance past the first timer and fire, finishing the first window
+ tester.advanceProcessingTime(new Instant(16));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ tester.fireIfShouldFire(firstWindow);
+ assertTrue(tester.isMarkedFinished(firstWindow));
+
+ // The next window fires and finishes now
+ tester.advanceProcessingTime(new Instant(18));
+ assertTrue(tester.shouldFire(secondWindow));
+ tester.fireIfShouldFire(secondWindow);
+ assertTrue(tester.isMarkedFinished(secondWindow));
+ }
+
+ @Test
+ public void testAfterProcessingTimeWithMergingWindow() throws Exception {
+ Duration windowDuration = Duration.millis(10);
+ SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
+ AfterProcessingTime
+ .pastFirstElementInPane()
+ .plusDelayOf(Duration.millis(5)),
+ Sessions.withGapDuration(windowDuration));
+
+ tester.advanceProcessingTime(new Instant(10));
+ tester.injectElements(1); // in [1, 11), timer for 15
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
+ assertFalse(tester.shouldFire(firstWindow));
+
+ tester.advanceProcessingTime(new Instant(12));
+ tester.injectElements(3); // in [3, 13), timer for 17
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new Instant(13));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ tester.mergeWindows();
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(13));
+
+ tester.advanceProcessingTime(new Instant(16));
+ assertTrue(tester.shouldFire(mergedWindow));
+ }
+
+ @Test
+ public void testFireDeadline() throws Exception {
+ assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
+ underTest.getWatermarkThatGuaranteesFiring(
+ new IntervalWindow(new Instant(0), new Instant(10))));
+ }
+
+ @Test
+ public void testContinuation() throws Exception {
+ assertEquals(underTest, underTest.getContinuationTrigger());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java
new file mode 100644
index 0000000..084027b
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests the {@link AfterWatermark} triggers.
+ */
+@RunWith(JUnit4.class)
+public class AfterWatermarkTest {
+
+ @Mock private OnceTrigger mockEarly;
+ @Mock private OnceTrigger mockLate;
+
+ private SimpleTriggerTester<IntervalWindow> tester;
+ private static Trigger.TriggerContext anyTriggerContext() {
+ return Mockito.<Trigger.TriggerContext>any();
+ }
+ private static Trigger.OnElementContext anyElementContext() {
+ return Mockito.<Trigger.OnElementContext>any();
+ }
+
+ private void injectElements(int... elements) throws Exception {
+ for (int element : elements) {
+ doNothing().when(mockEarly).onElement(anyElementContext());
+ doNothing().when(mockLate).onElement(anyElementContext());
+ tester.injectElements(element);
+ }
+ }
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ public void testRunningAsTrigger(OnceTrigger mockTrigger, IntervalWindow window)
+ throws Exception {
+
+ // Don't fire due to mock saying no
+ when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+ assertFalse(tester.shouldFire(window)); // not ready
+
+ // Fire due to mock trigger; early trigger is required to be a OnceTrigger
+ when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ assertTrue(tester.shouldFire(window)); // ready
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.isMarkedFinished(window));
+ }
+
+ @Test
+ public void testEarlyAndAtWatermark() throws Exception {
+ tester = TriggerTester.forTrigger(
+ AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(mockEarly),
+ FixedWindows.of(Duration.millis(100)));
+
+ injectElements(1);
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
+
+ testRunningAsTrigger(mockEarly, window);
+
+ // Fire due to watermark
+ when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
+ tester.advanceInputWatermark(new Instant(100));
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertTrue(tester.isMarkedFinished(window));
+ }
+
+ @Test
+ public void testAtWatermarkAndLate() throws Exception {
+ tester = TriggerTester.forTrigger(
+ AfterWatermark.pastEndOfWindow()
+ .withLateFirings(mockLate),
+ FixedWindows.of(Duration.millis(100)));
+
+ injectElements(1);
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
+
+ // No early firing, just double checking
+ when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(true);
+ assertFalse(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.isMarkedFinished(window));
+
+ // Fire due to watermark
+ when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
+ tester.advanceInputWatermark(new Instant(100));
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.isMarkedFinished(window));
+
+ testRunningAsTrigger(mockLate, window);
+ }
+
+ @Test
+ public void testEarlyAndAtWatermarkAndLate() throws Exception {
+ tester = TriggerTester.forTrigger(
+ AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(mockEarly)
+ .withLateFirings(mockLate),
+ FixedWindows.of(Duration.millis(100)));
+
+ injectElements(1);
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
+
+ testRunningAsTrigger(mockEarly, window);
+
+ // Fire due to watermark
+ when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
+ tester.advanceInputWatermark(new Instant(100));
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.isMarkedFinished(window));
+
+ testRunningAsTrigger(mockLate, window);
+ }
+
+ /**
+ * Tests that if the EOW is finished in both as well as the merged window, then
+ * it is finished in the merged result.
+ *
+ * <p>Because windows are discarded when a trigger finishes, we need to embed this
+ * in a sequence in order to check that it is re-activated. So this test is potentially
+ * sensitive to other triggers' correctness.
+ */
+ @Test
+ public void testOnMergeAlreadyFinished() throws Exception {
+ tester = TriggerTester.forTrigger(
+ AfterEach.inOrder(
+ AfterWatermark.pastEndOfWindow(),
+ Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
+ Sessions.withGapDuration(Duration.millis(10)));
+
+ tester.injectElements(1);
+ tester.injectElements(5);
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
+
+ // Finish the AfterWatermark.pastEndOfWindow() trigger in both windows
+ tester.advanceInputWatermark(new Instant(15));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertTrue(tester.shouldFire(secondWindow));
+ tester.fireIfShouldFire(firstWindow);
+ tester.fireIfShouldFire(secondWindow);
+
+ // Confirm that we are on the second trigger by probing
+ assertFalse(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ tester.injectElements(1);
+ tester.injectElements(5);
+ assertTrue(tester.shouldFire(firstWindow));
+ assertTrue(tester.shouldFire(secondWindow));
+ tester.fireIfShouldFire(firstWindow);
+ tester.fireIfShouldFire(secondWindow);
+
+ // Merging should leave it finished
+ tester.mergeWindows();
+
+ // Confirm that we are on the second trigger by probing
+ assertFalse(tester.shouldFire(mergedWindow));
+ tester.injectElements(1);
+ assertTrue(tester.shouldFire(mergedWindow));
+ }
+
+ /**
+ * Tests that the trigger rewinds to be non-finished in the merged window.
+ *
+ * <p>Because windows are discarded when a trigger finishes, we need to embed this
+ * in a sequence in order to check that it is re-activated. So this test is potentially
+ * sensitive to other triggers' correctness.
+ */
+ @Test
+ public void testOnMergeRewinds() throws Exception {
+ tester = TriggerTester.forTrigger(
+ AfterEach.inOrder(
+ AfterWatermark.pastEndOfWindow(),
+ Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
+ Sessions.withGapDuration(Duration.millis(10)));
+
+ tester.injectElements(1);
+ tester.injectElements(5);
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
+
+ // Finish the AfterWatermark.pastEndOfWindow() trigger in only the first window
+ tester.advanceInputWatermark(new Instant(11));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ tester.fireIfShouldFire(firstWindow);
+
+ // Confirm that we are on the second trigger by probing
+ assertFalse(tester.shouldFire(firstWindow));
+ tester.injectElements(1);
+ assertTrue(tester.shouldFire(firstWindow));
+ tester.fireIfShouldFire(firstWindow);
+
+ // Merging should re-activate the watermark trigger in the merged window
+ tester.mergeWindows();
+
+ // Confirm that we are not on the second trigger by probing
+ assertFalse(tester.shouldFire(mergedWindow));
+ tester.injectElements(1);
+ assertFalse(tester.shouldFire(mergedWindow));
+
+ // And confirm that advancing the watermark fires again
+ tester.advanceInputWatermark(new Instant(15));
+ assertTrue(tester.shouldFire(mergedWindow));
+ }
+
+ /**
+ * Tests that if the EOW is finished in both as well as the merged window, then
+ * it is finished in the merged result.
+ *
+ * <p>Because windows are discarded when a trigger finishes, we need to embed this
+ * in a sequence in order to check that it is re-activated. So this test is potentially
+ * sensitive to other triggers' correctness.
+ */
+ @Test
+ public void testEarlyAndLateOnMergeAlreadyFinished() throws Exception {
+ tester = TriggerTester.forTrigger(
+ AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(AfterPane.elementCountAtLeast(100))
+ .withLateFirings(AfterPane.elementCountAtLeast(1)),
+ Sessions.withGapDuration(Duration.millis(10)));
+
+ tester.injectElements(1);
+ tester.injectElements(5);
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
+
+ // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in both windows
+ tester.advanceInputWatermark(new Instant(15));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertTrue(tester.shouldFire(secondWindow));
+ tester.fireIfShouldFire(firstWindow);
+ tester.fireIfShouldFire(secondWindow);
+
+ // Confirm that we are on the late trigger by probing
+ assertFalse(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ tester.injectElements(1);
+ tester.injectElements(5);
+ assertTrue(tester.shouldFire(firstWindow));
+ assertTrue(tester.shouldFire(secondWindow));
+ tester.fireIfShouldFire(firstWindow);
+ tester.fireIfShouldFire(secondWindow);
+
+ // Merging should leave it on the late trigger
+ tester.mergeWindows();
+
+ // Confirm that we are on the late trigger by probing
+ assertFalse(tester.shouldFire(mergedWindow));
+ tester.injectElements(1);
+ assertTrue(tester.shouldFire(mergedWindow));
+ }
+
+ /**
+ * Tests that the trigger rewinds to be non-finished in the merged window.
+ *
+ * <p>Because windows are discarded when a trigger finishes, we need to embed this
+ * in a sequence in order to check that it is re-activated. So this test is potentially
+ * sensitive to other triggers' correctness.
+ */
+ @Test
+ public void testEarlyAndLateOnMergeRewinds() throws Exception {
+ tester = TriggerTester.forTrigger(
+ AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(AfterPane.elementCountAtLeast(100))
+ .withLateFirings(AfterPane.elementCountAtLeast(1)),
+ Sessions.withGapDuration(Duration.millis(10)));
+
+ tester.injectElements(1);
+ tester.injectElements(5);
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
+
+ // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in only the first window
+ tester.advanceInputWatermark(new Instant(11));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ tester.fireIfShouldFire(firstWindow);
+
+ // Confirm that we are on the late trigger by probing
+ assertFalse(tester.shouldFire(firstWindow));
+ tester.injectElements(1);
+ assertTrue(tester.shouldFire(firstWindow));
+ tester.fireIfShouldFire(firstWindow);
+
+ // Merging should re-activate the early trigger in the merged window
+ tester.mergeWindows();
+
+ // Confirm that we are not on the second trigger by probing
+ assertFalse(tester.shouldFire(mergedWindow));
+ tester.injectElements(1);
+ assertFalse(tester.shouldFire(mergedWindow));
+
+ // And confirm that advancing the watermark fires again
+ tester.advanceInputWatermark(new Instant(15));
+ assertTrue(tester.shouldFire(mergedWindow));
+ }
+
+ @Test
+ public void testFromEndOfWindowToString() {
+ Trigger trigger = AfterWatermark.pastEndOfWindow();
+ assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString());
+ }
+
+ @Test
+ public void testEarlyFiringsToString() {
+ Trigger trigger = AfterWatermark.pastEndOfWindow().withEarlyFirings(StubTrigger.named("t1"));
+
+ assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1)", trigger.toString());
+ }
+
+ @Test
+ public void testLateFiringsToString() {
+ Trigger trigger = AfterWatermark.pastEndOfWindow().withLateFirings(StubTrigger.named("t1"));
+
+ assertEquals("AfterWatermark.pastEndOfWindow().withLateFirings(t1)", trigger.toString());
+ }
+
+ @Test
+ public void testEarlyAndLateFiringsToString() {
+ Trigger trigger =
+ AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(StubTrigger.named("t1"))
+ .withLateFirings(StubTrigger.named("t2"));
+
+ assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1).withLateFirings(t2)",
+ trigger.toString());
+ }
+
+ @Test
+ public void testToStringExcludesNeverTrigger() {
+ Trigger trigger =
+ AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(Never.ever())
+ .withLateFirings(Never.ever());
+
+ assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java
new file mode 100644
index 0000000..673e555
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests the {@link DefaultTrigger}, which should be equivalent to
+ * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
+ */
+@RunWith(JUnit4.class)
+public class DefaultTriggerTest {
+
+ SimpleTriggerTester<IntervalWindow> tester;
+
+ @Test
+ public void testDefaultTriggerFixedWindows() throws Exception {
+ tester = TriggerTester.forTrigger(
+ DefaultTrigger.of(),
+ FixedWindows.of(Duration.millis(100)));
+
+ tester.injectElements(
+ 1, // [0, 100)
+ 101); // [100, 200)
+
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(100));
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(100), new Instant(200));
+
+ // Advance the watermark almost to the end of the first window.
+ tester.advanceInputWatermark(new Instant(99));
+ assertFalse(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ // Advance watermark past end of the first window, which is then ready
+ tester.advanceInputWatermark(new Instant(100));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ // Fire, but the first window is still allowed to fire
+ tester.fireIfShouldFire(firstWindow);
+ assertTrue(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ // Advance watermark to 200, then both are ready
+ tester.advanceInputWatermark(new Instant(200));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertTrue(tester.shouldFire(secondWindow));
+
+ assertFalse(tester.isMarkedFinished(firstWindow));
+ assertFalse(tester.isMarkedFinished(secondWindow));
+ }
+
+ @Test
+ public void testDefaultTriggerSlidingWindows() throws Exception {
+ tester = TriggerTester.forTrigger(
+ DefaultTrigger.of(),
+ SlidingWindows.of(Duration.millis(100)).every(Duration.millis(50)));
+
+ tester.injectElements(
+ 1, // [-50, 50), [0, 100)
+ 50); // [0, 100), [50, 150)
+
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(-50), new Instant(50));
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new Instant(100));
+ IntervalWindow thirdWindow = new IntervalWindow(new Instant(50), new Instant(150));
+
+ assertFalse(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ assertFalse(tester.shouldFire(thirdWindow));
+
+ // At 50, the first becomes ready; it stays ready after firing
+ tester.advanceInputWatermark(new Instant(50));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ assertFalse(tester.shouldFire(thirdWindow));
+ tester.fireIfShouldFire(firstWindow);
+ assertTrue(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ assertFalse(tester.shouldFire(thirdWindow));
+
+ // At 99, the first is still the only one ready
+ tester.advanceInputWatermark(new Instant(99));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ assertFalse(tester.shouldFire(thirdWindow));
+
+ // At 100, the first and second are ready
+ tester.advanceInputWatermark(new Instant(100));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertTrue(tester.shouldFire(secondWindow));
+ assertFalse(tester.shouldFire(thirdWindow));
+ tester.fireIfShouldFire(firstWindow);
+
+ assertFalse(tester.isMarkedFinished(firstWindow));
+ assertFalse(tester.isMarkedFinished(secondWindow));
+ assertFalse(tester.isMarkedFinished(thirdWindow));
+ }
+
+ @Test
+ public void testDefaultTriggerSessions() throws Exception {
+ tester = TriggerTester.forTrigger(
+ DefaultTrigger.of(),
+ Sessions.withGapDuration(Duration.millis(100)));
+
+ tester.injectElements(
+ 1, // [1, 101)
+ 50); // [50, 150)
+ tester.mergeWindows();
+
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(101));
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(50), new Instant(150));
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(150));
+
+ // Not ready in any window yet
+ tester.advanceInputWatermark(new Instant(100));
+ assertFalse(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ assertFalse(tester.shouldFire(mergedWindow));
+
+ // The first window is "ready": the caller owns knowledge of which windows are merged away
+ tester.advanceInputWatermark(new Instant(149));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ assertFalse(tester.shouldFire(mergedWindow));
+
+ // Now ready on all windows
+ tester.advanceInputWatermark(new Instant(150));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertTrue(tester.shouldFire(secondWindow));
+ assertTrue(tester.shouldFire(mergedWindow));
+
+ // Ensure it repeats
+ tester.fireIfShouldFire(mergedWindow);
+ assertTrue(tester.shouldFire(mergedWindow));
+
+ assertFalse(tester.isMarkedFinished(mergedWindow));
+ }
+
+ @Test
+ public void testFireDeadline() throws Exception {
+ assertEquals(new Instant(9), DefaultTrigger.of().getWatermarkThatGuaranteesFiring(
+ new IntervalWindow(new Instant(0), new Instant(10))));
+ assertEquals(GlobalWindow.INSTANCE.maxTimestamp(),
+ DefaultTrigger.of().getWatermarkThatGuaranteesFiring(GlobalWindow.INSTANCE));
+ }
+
+ @Test
+ public void testContinuation() throws Exception {
+ assertEquals(DefaultTrigger.of(), DefaultTrigger.of().getContinuationTrigger());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
new file mode 100644
index 0000000..fb2b4d5
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link Never}.
+ */
+@RunWith(JUnit4.class)
+public class NeverTest {
+ private SimpleTriggerTester<IntervalWindow> triggerTester;
+
+ @Before
+ public void setup() throws Exception {
+ triggerTester =
+ TriggerTester.forTrigger(
+ Never.ever(), FixedWindows.of(Duration.standardMinutes(5)));
+ }
+
+ @Test
+ public void falseAfterEndOfWindow() throws Exception {
+ triggerTester.injectElements(TimestampedValue.of(1, new Instant(1)));
+ IntervalWindow window =
+ new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardMinutes(5)));
+ assertThat(triggerTester.shouldFire(window), is(false));
+ triggerTester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ assertThat(triggerTester.shouldFire(window), is(false));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java
new file mode 100644
index 0000000..7289d97
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link OrFinallyTrigger}.
+ */
+@RunWith(JUnit4.class)
+public class OrFinallyTriggerTest {
+
+ private SimpleTriggerTester<IntervalWindow> tester;
+
+ /**
+ * Tests that for {@code OrFinally(actual, ...)} when {@code actual}
+ * fires and finishes, the {@code OrFinally} also fires and finishes.
+ */
+ @Test
+ public void testActualFiresAndFinishes() throws Exception {
+ tester = TriggerTester.forTrigger(
+ new OrFinallyTrigger(
+ AfterPane.elementCountAtLeast(2),
+ AfterPane.elementCountAtLeast(100)),
+ FixedWindows.of(Duration.millis(100)));
+
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
+
+ // Not yet firing
+ tester.injectElements(1);
+ assertFalse(tester.shouldFire(window));
+ assertFalse(tester.isMarkedFinished(window));
+
+ // The actual fires and finishes
+ tester.injectElements(2);
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertTrue(tester.isMarkedFinished(window));
+ }
+
+ /**
+ * Tests that for {@code OrFinally(actual, ...)} when {@code actual}
+ * fires but does not finish, the {@code OrFinally} also fires and also does not
+ * finish.
+ */
+ @Test
+ public void testActualFiresOnly() throws Exception {
+ tester = TriggerTester.forTrigger(
+ new OrFinallyTrigger(
+ Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
+ AfterPane.elementCountAtLeast(100)),
+ FixedWindows.of(Duration.millis(100)));
+
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
+
+ // Not yet firing
+ tester.injectElements(1);
+ assertFalse(tester.shouldFire(window));
+ assertFalse(tester.isMarkedFinished(window));
+
+ // The actual fires but does not finish
+ tester.injectElements(2);
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.isMarkedFinished(window));
+
+ // And again
+ tester.injectElements(3, 4);
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.isMarkedFinished(window));
+ }
+
+ /**
+ * Tests that if the first trigger rewinds to be non-finished in the merged window,
+ * then it becomes the currently active trigger again, with real triggers.
+ */
+ @Test
+ public void testShouldFireAfterMerge() throws Exception {
+ tester = TriggerTester.forTrigger(
+ AfterEach.inOrder(
+ AfterPane.elementCountAtLeast(5)
+ .orFinally(AfterWatermark.pastEndOfWindow()),
+ Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
+ Sessions.withGapDuration(Duration.millis(10)));
+
+ // Finished the orFinally in the first window
+ tester.injectElements(1);
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
+ assertFalse(tester.shouldFire(firstWindow));
+ tester.advanceInputWatermark(new Instant(11));
+ assertTrue(tester.shouldFire(firstWindow));
+ tester.fireIfShouldFire(firstWindow);
+
+ // Set up second window where it is not done
+ tester.injectElements(5);
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ // Merge them, if the merged window were on the second trigger, it would be ready
+ tester.mergeWindows();
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
+ assertFalse(tester.shouldFire(mergedWindow));
+
+ // Now adding 3 more makes the main trigger ready to fire
+ tester.injectElements(1, 2, 3, 4, 5);
+ tester.mergeWindows();
+ assertTrue(tester.shouldFire(mergedWindow));
+ }
+
+ /**
+ * Tests that for {@code OrFinally(actual, until)} when {@code actual}
+ * fires but does not finish, then {@code until} fires and finishes, the
+ * whole thing fires and finished.
+ */
+ @Test
+ public void testActualFiresButUntilFinishes() throws Exception {
+ tester = TriggerTester.forTrigger(
+ new OrFinallyTrigger(
+ Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
+ AfterPane.elementCountAtLeast(3)),
+ FixedWindows.of(Duration.millis(10)));
+
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
+
+ // Before any firing
+ tester.injectElements(1);
+ assertFalse(tester.shouldFire(window));
+ assertFalse(tester.isMarkedFinished(window));
+
+ // The actual fires but doesn't finish
+ tester.injectElements(2);
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.isMarkedFinished(window));
+
+ // The until fires and finishes; the trigger is finished
+ tester.injectElements(3);
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertTrue(tester.isMarkedFinished(window));
+ }
+
+ @Test
+ public void testFireDeadline() throws Exception {
+ BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10));
+
+ assertEquals(new Instant(9),
+ Repeatedly.forever(AfterWatermark.pastEndOfWindow())
+ .getWatermarkThatGuaranteesFiring(window));
+ assertEquals(new Instant(9), Repeatedly.forever(AfterWatermark.pastEndOfWindow())
+ .orFinally(AfterPane.elementCountAtLeast(1))
+ .getWatermarkThatGuaranteesFiring(window));
+ assertEquals(new Instant(9), Repeatedly.forever(AfterPane.elementCountAtLeast(1))
+ .orFinally(AfterWatermark.pastEndOfWindow())
+ .getWatermarkThatGuaranteesFiring(window));
+ assertEquals(new Instant(9),
+ AfterPane.elementCountAtLeast(100)
+ .orFinally(AfterWatermark.pastEndOfWindow())
+ .getWatermarkThatGuaranteesFiring(window));
+
+ assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
+ Repeatedly.forever(AfterPane.elementCountAtLeast(1))
+ .orFinally(AfterPane.elementCountAtLeast(10))
+ .getWatermarkThatGuaranteesFiring(window));
+ }
+
+ @Test
+ public void testContinuation() throws Exception {
+ OnceTrigger triggerA = AfterProcessingTime.pastFirstElementInPane();
+ OnceTrigger triggerB = AfterWatermark.pastEndOfWindow();
+ Trigger aOrFinallyB = triggerA.orFinally(triggerB);
+ Trigger bOrFinallyA = triggerB.orFinally(triggerA);
+ assertEquals(
+ Repeatedly.forever(
+ triggerA.getContinuationTrigger().orFinally(triggerB.getContinuationTrigger())),
+ aOrFinallyB.getContinuationTrigger());
+ assertEquals(
+ Repeatedly.forever(
+ triggerB.getContinuationTrigger().orFinally(triggerA.getContinuationTrigger())),
+ bOrFinallyA.getContinuationTrigger());
+ }
+
+ @Test
+ public void testToString() {
+ Trigger trigger = StubTrigger.named("t1").orFinally(StubTrigger.named("t2"));
+ assertEquals("t1.orFinally(t2)", trigger.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java
new file mode 100644
index 0000000..6e8930d
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link Repeatedly}.
+ */
+@RunWith(JUnit4.class)
+public class RepeatedlyTest {
+
+ @Mock private Trigger mockTrigger;
+ private SimpleTriggerTester<IntervalWindow> tester;
+ private static Trigger.TriggerContext anyTriggerContext() {
+ return Mockito.<Trigger.TriggerContext>any();
+ }
+
+ public void setUp(WindowFn<Object, IntervalWindow> windowFn) throws Exception {
+ MockitoAnnotations.initMocks(this);
+ tester = TriggerTester.forTrigger(Repeatedly.forever(mockTrigger), windowFn);
+ }
+
+ /**
+ * Tests that onElement correctly passes the data on to the subtrigger.
+ */
+ @Test
+ public void testOnElement() throws Exception {
+ setUp(FixedWindows.of(Duration.millis(10)));
+ tester.injectElements(37);
+ verify(mockTrigger).onElement(Mockito.<Trigger.OnElementContext>any());
+ }
+
+ /**
+ * Tests that the repeatedly is ready to fire whenever the subtrigger is ready.
+ */
+ @Test
+ public void testShouldFire() throws Exception {
+ setUp(FixedWindows.of(Duration.millis(10)));
+
+ when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ assertTrue(tester.shouldFire(new IntervalWindow(new Instant(0), new Instant(10))));
+
+ when(mockTrigger.shouldFire(Mockito.<Trigger.TriggerContext>any()))
+ .thenReturn(false);
+ assertFalse(tester.shouldFire(new IntervalWindow(new Instant(0), new Instant(10))));
+ }
+
+ /**
+ * Tests that the watermark that guarantees firing is that of the subtrigger.
+ */
+ @Test
+ public void testFireDeadline() throws Exception {
+ setUp(FixedWindows.of(Duration.millis(10)));
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
+ Instant arbitraryInstant = new Instant(34957849);
+
+ when(mockTrigger.getWatermarkThatGuaranteesFiring(Mockito.<IntervalWindow>any()))
+ .thenReturn(arbitraryInstant);
+
+ assertThat(
+ Repeatedly.forever(mockTrigger).getWatermarkThatGuaranteesFiring(window),
+ equalTo(arbitraryInstant));
+ }
+
+ @Test
+ public void testContinuation() throws Exception {
+ Trigger trigger = AfterProcessingTime.pastFirstElementInPane();
+ Trigger repeatedly = Repeatedly.forever(trigger);
+ assertEquals(
+ Repeatedly.forever(trigger.getContinuationTrigger()), repeatedly.getContinuationTrigger());
+ assertEquals(
+ Repeatedly.forever(trigger.getContinuationTrigger().getContinuationTrigger()),
+ repeatedly.getContinuationTrigger().getContinuationTrigger());
+ }
+
+ @Test
+ public void testShouldFireAfterMerge() throws Exception {
+ tester = TriggerTester.forTrigger(
+ Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
+ Sessions.withGapDuration(Duration.millis(10)));
+
+ tester.injectElements(1);
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
+ assertFalse(tester.shouldFire(firstWindow));
+
+ tester.injectElements(5);
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ // Merge them, if the merged window were on the second trigger, it would be ready
+ tester.mergeWindows();
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
+ assertTrue(tester.shouldFire(mergedWindow));
+ }
+
+ @Test
+ public void testRepeatedlyAfterFirstElementCount() throws Exception {
+ SimpleTriggerTester<GlobalWindow> tester =
+ TriggerTester.forTrigger(
+ Repeatedly.forever(
+ AfterFirst.of(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(Duration.standardMinutes(15)),
+ AfterPane.elementCountAtLeast(5))),
+ new GlobalWindows());
+
+ GlobalWindow window = GlobalWindow.INSTANCE;
+
+ tester.injectElements(1);
+ assertFalse(tester.shouldFire(window));
+
+ tester.injectElements(2, 3, 4, 5);
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.shouldFire(window));
+ }
+
+ @Test
+ public void testRepeatedlyAfterFirstProcessingTime() throws Exception {
+ SimpleTriggerTester<GlobalWindow> tester =
+ TriggerTester.forTrigger(
+ Repeatedly.forever(
+ AfterFirst.of(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(Duration.standardMinutes(15)),
+ AfterPane.elementCountAtLeast(5))),
+ new GlobalWindows());
+
+ GlobalWindow window = GlobalWindow.INSTANCE;
+
+ tester.injectElements(1);
+ assertFalse(tester.shouldFire(window));
+
+ tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15)));
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.shouldFire(window));
+ }
+
+ @Test
+ public void testRepeatedlyElementCount() throws Exception {
+ SimpleTriggerTester<GlobalWindow> tester =
+ TriggerTester.forTrigger(
+ Repeatedly.forever(AfterPane.elementCountAtLeast(5)),
+ new GlobalWindows());
+
+ GlobalWindow window = GlobalWindow.INSTANCE;
+
+ tester.injectElements(1);
+ assertFalse(tester.shouldFire(window));
+
+ tester.injectElements(2, 3, 4, 5);
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.shouldFire(window));
+ }
+
+ @Test
+ public void testRepeatedlyProcessingTime() throws Exception {
+ SimpleTriggerTester<GlobalWindow> tester =
+ TriggerTester.forTrigger(
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(Duration.standardMinutes(15))),
+ new GlobalWindows());
+
+ GlobalWindow window = GlobalWindow.INSTANCE;
+
+ tester.injectElements(1);
+ assertFalse(tester.shouldFire(window));
+
+ tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15)));
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.shouldFire(window));
+ }
+
+
+ @Test
+ public void testToString() {
+ Trigger trigger = Repeatedly.forever(new StubTrigger() {
+ @Override
+ public String toString() {
+ return "innerTrigger";
+ }
+ });
+
+ assertEquals("Repeatedly.forever(innerTrigger)", trigger.toString());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
new file mode 100644
index 0000000..b258a79
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.joda.time.Instant;
+
+/**
+ * No-op {@link OnceTrigger} implementation for testing.
+ */
+abstract class StubTrigger extends Trigger.OnceTrigger {
+ /**
+ * Create a stub {@link Trigger} instance which returns the specified name on {@link #toString()}.
+ */
+ static StubTrigger named(final String name) {
+ return new StubTrigger() {
+ @Override
+ public String toString() {
+ return name;
+ }
+ };
+ }
+
+ protected StubTrigger() {
+ super(Lists.<Trigger>newArrayList());
+ }
+
+ @Override
+ protected void onOnlyFiring(TriggerContext context) throws Exception {
+ }
+
+ @Override
+ public void onElement(OnElementContext c) throws Exception {
+ }
+
+ @Override
+ public void onMerge(OnMergeContext c) throws Exception {
+ }
+
+ @Override
+ public boolean shouldFire(TriggerContext context) throws Exception {
+ return false;
+ }
+
+ @Override
+ protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+ return null;
+ }
+
+ @Override
+ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java
new file mode 100644
index 0000000..cfc03b2
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.List;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link Trigger}.
+ */
+@RunWith(JUnit4.class)
+public class TriggerTest {
+
+ @Test
+ public void testTriggerToString() throws Exception {
+ assertEquals("AfterWatermark.pastEndOfWindow()", AfterWatermark.pastEndOfWindow().toString());
+ assertEquals("Repeatedly.forever(AfterWatermark.pastEndOfWindow())",
+ Repeatedly.forever(AfterWatermark.pastEndOfWindow()).toString());
+ }
+
+ @Test
+ public void testIsCompatible() throws Exception {
+ assertTrue(new Trigger1(null).isCompatible(new Trigger1(null)));
+ assertTrue(new Trigger1(Arrays.<Trigger>asList(new Trigger2(null)))
+ .isCompatible(new Trigger1(Arrays.<Trigger>asList(new Trigger2(null)))));
+
+ assertFalse(new Trigger1(null).isCompatible(new Trigger2(null)));
+ assertFalse(new Trigger1(Arrays.<Trigger>asList(new Trigger1(null)))
+ .isCompatible(new Trigger1(Arrays.<Trigger>asList(new Trigger2(null)))));
+ }
+
+ private static class Trigger1 extends Trigger {
+
+ private Trigger1(List<Trigger> subTriggers) {
+ super(subTriggers);
+ }
+
+ @Override
+ public void onElement(Trigger.OnElementContext c) { }
+
+ @Override
+ public void onMerge(Trigger.OnMergeContext c) { }
+
+ @Override
+ protected Trigger getContinuationTrigger(
+ List<Trigger> continuationTriggers) {
+ return null;
+ }
+
+ @Override
+ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+ return null;
+ }
+
+ @Override
+ public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
+ return false;
+ }
+
+ @Override
+ public void onFire(Trigger.TriggerContext context) throws Exception { }
+ }
+
+ private static class Trigger2 extends Trigger {
+
+ private Trigger2(List<Trigger> subTriggers) {
+ super(subTriggers);
+ }
+
+ @Override
+ public void onElement(Trigger.OnElementContext c) { }
+
+ @Override
+ public void onMerge(Trigger.OnMergeContext c) { }
+
+ @Override
+ protected Trigger getContinuationTrigger(
+ List<Trigger> continuationTriggers) {
+ return null;
+ }
+
+ @Override
+ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+ return null;
+ }
+
+ @Override
+ public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
+ return false;
+ }
+
+ @Override
+ public void onFire(Trigger.TriggerContext context) throws Exception { }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
new file mode 100644
index 0000000..5fe17ad
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.ActiveWindowSet.MergeCallback;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateNamespaces.WindowAndTriggerNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
+import org.apache.beam.sdk.util.state.TestInMemoryStateInternals;
+import org.apache.beam.sdk.util.state.TimerCallback;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Test utility that runs a {@link Trigger}, using in-memory stub implementation to provide
+ * the {@link StateInternals}.
+ *
+ * @param <W> The type of windows being used.
+ */
+public class TriggerTester<InputT, W extends BoundedWindow> {
+
+ /**
+ * A {@link TriggerTester} specialized to {@link Integer} values, so elements and timestamps
+ * can be conflated. Today, triggers should not observed the element type, so this is the
+ * only trigger tester that needs to be used.
+ */
+ public static class SimpleTriggerTester<W extends BoundedWindow>
+ extends TriggerTester<Integer, W> {
+
+ private SimpleTriggerTester(WindowingStrategy<Object, W> windowingStrategy) throws Exception {
+ super(windowingStrategy);
+ }
+
+ public void injectElements(int... values) throws Exception {
+ List<TimestampedValue<Integer>> timestampedValues =
+ Lists.newArrayListWithCapacity(values.length);
+ for (int value : values) {
+ timestampedValues.add(TimestampedValue.of(value, new Instant(value)));
+ }
+ injectElements(timestampedValues);
+ }
+
+ public SimpleTriggerTester<W> withAllowedLateness(Duration allowedLateness) throws Exception {
+ return new SimpleTriggerTester<>(
+ windowingStrategy.withAllowedLateness(allowedLateness));
+ }
+ }
+
+ protected final WindowingStrategy<Object, W> windowingStrategy;
+
+ private final TestInMemoryStateInternals<?> stateInternals =
+ new TestInMemoryStateInternals<Object>(null /* key */);
+ private final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
+ private final TriggerContextFactory<W> contextFactory;
+ private final WindowFn<Object, W> windowFn;
+ private final ActiveWindowSet<W> activeWindows;
+ private final Map<W, W> windowToMergeResult;
+
+ /**
+ * An {@link ExecutableTrigger} built from the {@link Trigger} or {@link Trigger}
+ * under test.
+ */
+ private final ExecutableTrigger executableTrigger;
+
+ /**
+ * A map from a window and trigger to whether that trigger is finished for the window.
+ */
+ private final Map<W, FinishedTriggers> finishedSets;
+
+ public static <W extends BoundedWindow> SimpleTriggerTester<W> forTrigger(
+ Trigger trigger, WindowFn<Object, W> windowFn)
+ throws Exception {
+ WindowingStrategy<Object, W> windowingStrategy =
+ WindowingStrategy.of(windowFn).withTrigger(trigger)
+ // Merging requires accumulation mode or early firings can break up a session.
+ // Not currently an issue with the tester (because we never GC) but we don't want
+ // mystery failures due to violating this need.
+ .withMode(windowFn.isNonMerging()
+ ? AccumulationMode.DISCARDING_FIRED_PANES
+ : AccumulationMode.ACCUMULATING_FIRED_PANES);
+
+ return new SimpleTriggerTester<>(windowingStrategy);
+ }
+
+ public static <InputT, W extends BoundedWindow> TriggerTester<InputT, W> forAdvancedTrigger(
+ Trigger trigger, WindowFn<Object, W> windowFn) throws Exception {
+ WindowingStrategy<Object, W> strategy =
+ WindowingStrategy.of(windowFn).withTrigger(trigger)
+ // Merging requires accumulation mode or early firings can break up a session.
+ // Not currently an issue with the tester (because we never GC) but we don't want
+ // mystery failures due to violating this need.
+ .withMode(windowFn.isNonMerging()
+ ? AccumulationMode.DISCARDING_FIRED_PANES
+ : AccumulationMode.ACCUMULATING_FIRED_PANES);
+
+ return new TriggerTester<>(strategy);
+ }
+
+ protected TriggerTester(WindowingStrategy<Object, W> windowingStrategy) throws Exception {
+ this.windowingStrategy = windowingStrategy;
+ this.windowFn = windowingStrategy.getWindowFn();
+ this.executableTrigger = windowingStrategy.getTrigger();
+ this.finishedSets = new HashMap<>();
+
+ this.activeWindows =
+ windowFn.isNonMerging()
+ ? new NonMergingActiveWindowSet<W>()
+ : new MergingActiveWindowSet<W>(windowFn, stateInternals);
+ this.windowToMergeResult = new HashMap<>();
+
+ this.contextFactory =
+ new TriggerContextFactory<>(windowingStrategy.getWindowFn(), stateInternals, activeWindows);
+ }
+
+ /**
+ * Instructs the trigger to clear its state for the given window.
+ */
+ public void clearState(W window) throws Exception {
+ executableTrigger.invokeClear(contextFactory.base(window,
+ new TestTimers(windowNamespace(window)), executableTrigger, getFinishedSet(window)));
+ }
+
+ /**
+ * Asserts that the trigger has actually cleared all of its state for the given window. Since
+ * the trigger under test is the root, this makes the assert for all triggers regardless
+ * of their position in the trigger tree.
+ */
+ public void assertCleared(W window) {
+ for (StateNamespace untypedNamespace : stateInternals.getNamespacesInUse()) {
+ if (untypedNamespace instanceof WindowAndTriggerNamespace) {
+ @SuppressWarnings("unchecked")
+ WindowAndTriggerNamespace<W> namespace = (WindowAndTriggerNamespace<W>) untypedNamespace;
+ if (namespace.getWindow().equals(window)) {
+ Set<?> tagsInUse = stateInternals.getTagsInUse(namespace);
+ assertTrue("Trigger has not cleared tags: " + tagsInUse, tagsInUse.isEmpty());
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns {@code true} if the {@link Trigger} under test is finished for the given window.
+ */
+ public boolean isMarkedFinished(W window) {
+ FinishedTriggers finishedSet = finishedSets.get(window);
+ if (finishedSet == null) {
+ return false;
+ }
+
+ return finishedSet.isFinished(executableTrigger);
+ }
+
+ private StateNamespace windowNamespace(W window) {
+ return StateNamespaces.window(windowFn.windowCoder(), checkNotNull(window));
+ }
+
+ /**
+ * Advance the input watermark to the specified time, then advance the output watermark as far as
+ * possible.
+ */
+ public void advanceInputWatermark(Instant newInputWatermark) throws Exception {
+ // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694
+ timerInternals.advanceInputWatermark(TimerCallback.NO_OP, newInputWatermark);
+ }
+
+ /** Advance the processing time to the specified time. */
+ public void advanceProcessingTime(Instant newProcessingTime) throws Exception {
+ // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694
+ timerInternals.advanceProcessingTime(TimerCallback.NO_OP, newProcessingTime);
+ }
+
+ /**
+ * Inject all the timestamped values (after passing through the window function) as if they
+ * arrived in a single chunk of a bundle (or work-unit).
+ */
+ @SafeVarargs
+ public final void injectElements(TimestampedValue<InputT>... values) throws Exception {
+ injectElements(Arrays.asList(values));
+ }
+
+ public final void injectElements(Collection<TimestampedValue<InputT>> values) throws Exception {
+ for (TimestampedValue<InputT> value : values) {
+ WindowTracing.trace("TriggerTester.injectElements: {}", value);
+ }
+
+ List<WindowedValue<InputT>> windowedValues = Lists.newArrayListWithCapacity(values.size());
+
+ for (TimestampedValue<InputT> input : values) {
+ try {
+ InputT value = input.getValue();
+ Instant timestamp = input.getTimestamp();
+ Collection<W> assignedWindows = windowFn.assignWindows(new TestAssignContext<W>(
+ windowFn, value, timestamp, GlobalWindow.INSTANCE));
+
+ for (W window : assignedWindows) {
+ activeWindows.addActiveForTesting(window);
+
+ // Today, triggers assume onTimer firing at the watermark time, whether or not they
+ // explicitly set the timer themselves. So this tester must set it.
+ timerInternals.setTimer(
+ TimerData.of(windowNamespace(window), window.maxTimestamp(), TimeDomain.EVENT_TIME));
+ }
+
+ windowedValues.add(WindowedValue.of(value, timestamp, assignedWindows, PaneInfo.NO_FIRING));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ for (WindowedValue<InputT> windowedValue : windowedValues) {
+ for (BoundedWindow untypedWindow : windowedValue.getWindows()) {
+ // SDK is responsible for type safety
+ @SuppressWarnings("unchecked")
+ W window = mergeResult((W) untypedWindow);
+
+ Trigger.OnElementContext context = contextFactory.createOnElementContext(window,
+ new TestTimers(windowNamespace(window)), windowedValue.getTimestamp(),
+ executableTrigger, getFinishedSet(window));
+
+ if (!context.trigger().isFinished()) {
+ executableTrigger.invokeOnElement(context);
+ }
+ }
+ }
+ }
+
+ public boolean shouldFire(W window) throws Exception {
+ Trigger.TriggerContext context = contextFactory.base(
+ window,
+ new TestTimers(windowNamespace(window)),
+ executableTrigger, getFinishedSet(window));
+ executableTrigger.getSpec().prefetchShouldFire(context.state());
+ return executableTrigger.invokeShouldFire(context);
+ }
+
+ public void fireIfShouldFire(W window) throws Exception {
+ Trigger.TriggerContext context = contextFactory.base(
+ window,
+ new TestTimers(windowNamespace(window)),
+ executableTrigger, getFinishedSet(window));
+
+ executableTrigger.getSpec().prefetchShouldFire(context.state());
+ if (executableTrigger.invokeShouldFire(context)) {
+ executableTrigger.getSpec().prefetchOnFire(context.state());
+ executableTrigger.invokeOnFire(context);
+ if (context.trigger().isFinished()) {
+ activeWindows.remove(window);
+ executableTrigger.invokeClear(context);
+ }
+ }
+ }
+
+ public void setSubTriggerFinishedForWindow(int subTriggerIndex, W window, boolean value) {
+ getFinishedSet(window).setFinished(executableTrigger.subTriggers().get(subTriggerIndex), value);
+ }
+
+ /**
+ * Invokes merge from the {@link WindowFn} a single time and passes the resulting merge
+ * events on to the trigger under test. Does not persist the fact that merging happened,
+ * since it is just to test the trigger's {@code OnMerge} method.
+ */
+ public final void mergeWindows() throws Exception {
+ windowToMergeResult.clear();
+ activeWindows.merge(new MergeCallback<W>() {
+ @Override
+ public void prefetchOnMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {}
+
+ @Override
+ public void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {
+ List<W> activeToBeMerged = new ArrayList<W>();
+ for (W window : toBeMerged) {
+ windowToMergeResult.put(window, mergeResult);
+ if (activeWindows.isActive(window)) {
+ activeToBeMerged.add(window);
+ }
+ }
+ Map<W, FinishedTriggers> mergingFinishedSets =
+ Maps.newHashMapWithExpectedSize(activeToBeMerged.size());
+ for (W oldWindow : activeToBeMerged) {
+ mergingFinishedSets.put(oldWindow, getFinishedSet(oldWindow));
+ }
+ executableTrigger.invokeOnMerge(contextFactory.createOnMergeContext(mergeResult,
+ new TestTimers(windowNamespace(mergeResult)), executableTrigger,
+ getFinishedSet(mergeResult), mergingFinishedSets));
+ timerInternals.setTimer(TimerData.of(
+ windowNamespace(mergeResult), mergeResult.maxTimestamp(), TimeDomain.EVENT_TIME));
+ }
+ });
+ }
+
+ public W mergeResult(W window) {
+ W result = windowToMergeResult.get(window);
+ return result == null ? window : result;
+ }
+
+ private FinishedTriggers getFinishedSet(W window) {
+ FinishedTriggers finishedSet = finishedSets.get(window);
+ if (finishedSet == null) {
+ finishedSet = FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>());
+ finishedSets.put(window, finishedSet);
+ }
+ return finishedSet;
+ }
+
+ private static class TestAssignContext<W extends BoundedWindow>
+ extends WindowFn<Object, W>.AssignContext {
+ private Object element;
+ private Instant timestamp;
+ private BoundedWindow window;
+
+ public TestAssignContext(
+ WindowFn<Object, W> windowFn, Object element, Instant timestamp, BoundedWindow window) {
+ windowFn.super();
+ this.element = element;
+ this.timestamp = timestamp;
+ this.window = window;
+ }
+
+ @Override
+ public Object element() {
+ return element;
+ }
+
+ @Override
+ public Instant timestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return window;
+ }
+ }
+
+ private class TestTimers implements Timers {
+ private final StateNamespace namespace;
+
+ public TestTimers(StateNamespace namespace) {
+ checkArgument(namespace instanceof WindowNamespace);
+ this.namespace = namespace;
+ }
+
+ @Override
+ public void setTimer(Instant timestamp, TimeDomain timeDomain) {
+ timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain));
+ }
+
+ @Override
+ public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
+ timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain));
+ }
+
+ @Override
+ public Instant currentProcessingTime() {
+ return timerInternals.currentProcessingTime();
+ }
+
+ @Override
+ @Nullable
+ public Instant currentSynchronizedProcessingTime() {
+ return timerInternals.currentSynchronizedProcessingTime();
+ }
+
+ @Override
+ public Instant currentEventTime() {
+ return timerInternals.currentInputWatermarkTime();
+ }
+ }
+}