You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/13 22:22:45 UTC
[11/17] incubator-beam git commit: Move triggers to runners-core
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterPaneTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterPaneTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterPaneTest.java
new file mode 100644
index 0000000..38d030e
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterPaneTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link AfterPane}.
+ */
+@RunWith(JUnit4.class)
+public class AfterPaneTest {
+
+ SimpleTriggerTester<IntervalWindow> tester;
+ /**
+ * Tests that the trigger does fire when enough elements are in a window, and that it only
+ * fires that window (no leakage).
+ */
+ @Test
+ public void testAfterPaneElementCountFixedWindows() throws Exception {
+ tester = TriggerTester.forTrigger(
+ AfterPane.elementCountAtLeast(2),
+ FixedWindows.of(Duration.millis(10)));
+
+ tester.injectElements(1); // [0, 10)
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
+ assertFalse(tester.shouldFire(window));
+
+ tester.injectElements(2); // [0, 10)
+ tester.injectElements(11); // [10, 20)
+
+ assertTrue(tester.shouldFire(window)); // ready to fire
+ tester.fireIfShouldFire(window); // and finished
+ assertTrue(tester.isMarkedFinished(window));
+
+ // But don't finish the other window
+ assertFalse(tester.isMarkedFinished(new IntervalWindow(new Instant(10), new Instant(20))));
+ }
+
+ @Test
+ public void testClear() throws Exception {
+ SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
+ AfterPane.elementCountAtLeast(2),
+ FixedWindows.of(Duration.millis(10)));
+
+ tester.injectElements(1, 2, 3);
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
+ tester.clearState(window);
+ tester.assertCleared(window);
+ }
+
+ @Test
+ public void testAfterPaneElementCountSessions() throws Exception {
+ tester = TriggerTester.forTrigger(
+ AfterPane.elementCountAtLeast(2),
+ Sessions.withGapDuration(Duration.millis(10)));
+
+ tester.injectElements(
+ 1, // in [1, 11)
+ 2); // in [2, 12)
+
+ assertFalse(tester.shouldFire(new IntervalWindow(new Instant(1), new Instant(11))));
+ assertFalse(tester.shouldFire(new IntervalWindow(new Instant(2), new Instant(12))));
+
+ tester.mergeWindows();
+
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(12));
+ assertTrue(tester.shouldFire(mergedWindow));
+ tester.fireIfShouldFire(mergedWindow);
+ assertTrue(tester.isMarkedFinished(mergedWindow));
+
+ // Because we closed the previous window, we don't have it around to merge with. So there
+ // will be a new FIRE_AND_FINISH result.
+ tester.injectElements(
+ 7, // in [7, 17)
+ 9); // in [9, 19)
+
+ tester.mergeWindows();
+
+ IntervalWindow newMergedWindow = new IntervalWindow(new Instant(7), new Instant(19));
+ assertTrue(tester.shouldFire(newMergedWindow));
+ tester.fireIfShouldFire(newMergedWindow);
+ assertTrue(tester.isMarkedFinished(newMergedWindow));
+ }
+
+ @Test
+ public void testFireDeadline() throws Exception {
+ assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
+ AfterPane.elementCountAtLeast(1).getWatermarkThatGuaranteesFiring(
+ new IntervalWindow(new Instant(0), new Instant(10))));
+ }
+
+ @Test
+ public void testContinuation() throws Exception {
+ assertEquals(
+ AfterPane.elementCountAtLeast(1),
+ AfterPane.elementCountAtLeast(100).getContinuationTrigger());
+ assertEquals(
+ AfterPane.elementCountAtLeast(1),
+ AfterPane.elementCountAtLeast(100).getContinuationTrigger().getContinuationTrigger());
+ }
+
+ @Test
+ public void testToString() {
+ Trigger trigger = AfterPane.elementCountAtLeast(5);
+ assertEquals("AfterPane.elementCountAtLeast(5)", trigger.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterProcessingTimeTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterProcessingTimeTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterProcessingTimeTest.java
new file mode 100644
index 0000000..13a7acf
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterProcessingTimeTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests the {@link AfterProcessingTime}.
+ */
+@RunWith(JUnit4.class)
+public class AfterProcessingTimeTest {
+
+ /**
+ * Tests the basic property that the trigger does wait for processing time to be
+ * far enough advanced.
+ */
+ @Test
+ public void testAfterProcessingTimeFixedWindows() throws Exception {
+ Duration windowDuration = Duration.millis(10);
+ SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
+ AfterProcessingTime
+ .pastFirstElementInPane()
+ .plusDelayOf(Duration.millis(5)),
+ FixedWindows.of(windowDuration));
+
+ tester.advanceProcessingTime(new Instant(10));
+
+ // Timer at 15
+ tester.injectElements(1);
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(10));
+ tester.advanceProcessingTime(new Instant(12));
+ assertFalse(tester.shouldFire(firstWindow));
+
+ // Load up elements in the next window, timer at 17 for them
+ tester.injectElements(11, 12, 13);
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(20));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ // Not quite time to fire
+ tester.advanceProcessingTime(new Instant(14));
+ assertFalse(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ // Timer at 19 for these in the first window; it should be ignored since the 15 will fire first
+ tester.injectElements(2, 3);
+
+ // Advance past the first timer and fire, finishing the first window
+ tester.advanceProcessingTime(new Instant(16));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ tester.fireIfShouldFire(firstWindow);
+ assertTrue(tester.isMarkedFinished(firstWindow));
+
+ // The next window fires and finishes now
+ tester.advanceProcessingTime(new Instant(18));
+ assertTrue(tester.shouldFire(secondWindow));
+ tester.fireIfShouldFire(secondWindow);
+ assertTrue(tester.isMarkedFinished(secondWindow));
+ }
+
+ /**
+ * Tests that when windows merge, if the trigger is waiting for "N millis after the first
+ * element" that it is relative to the earlier of the two merged windows.
+ */
+ @Test
+ public void testClear() throws Exception {
+ SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
+ AfterProcessingTime
+ .pastFirstElementInPane()
+ .plusDelayOf(Duration.millis(5)),
+ FixedWindows.of(Duration.millis(10)));
+
+ tester.injectElements(1, 2, 3);
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
+ tester.clearState(window);
+ tester.assertCleared(window);
+ }
+
+ @Test
+ public void testAfterProcessingTimeWithMergingWindow() throws Exception {
+ SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
+ AfterProcessingTime
+ .pastFirstElementInPane()
+ .plusDelayOf(Duration.millis(5)),
+ Sessions.withGapDuration(Duration.millis(10)));
+
+ tester.advanceProcessingTime(new Instant(10));
+ tester.injectElements(1); // in [1, 11), timer for 15
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
+ assertFalse(tester.shouldFire(firstWindow));
+
+ tester.advanceProcessingTime(new Instant(12));
+ tester.injectElements(3); // in [3, 13), timer for 17
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new Instant(13));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ tester.mergeWindows();
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(13));
+
+ tester.advanceProcessingTime(new Instant(16));
+ assertTrue(tester.shouldFire(mergedWindow));
+ }
+
+ @Test
+ public void testFireDeadline() throws Exception {
+ assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
+ AfterProcessingTime.pastFirstElementInPane().getWatermarkThatGuaranteesFiring(
+ new IntervalWindow(new Instant(0), new Instant(10))));
+ }
+
+ @Test
+ public void testContinuation() throws Exception {
+ OnceTrigger firstElementPlus1 =
+ AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1));
+ assertEquals(
+ new AfterSynchronizedProcessingTime(),
+ firstElementPlus1.getContinuationTrigger());
+ }
+
+ /**
+ * Basic test of compatibility check between identical triggers.
+ */
+ @Test
+ public void testCompatibilityIdentical() throws Exception {
+ Trigger t1 = AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(Duration.standardMinutes(1L));
+ Trigger t2 = AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(Duration.standardMinutes(1L));
+ assertTrue(t1.isCompatible(t2));
+ }
+
+ @Test
+ public void testToString() {
+ Trigger trigger = AfterProcessingTime.pastFirstElementInPane();
+ assertEquals("AfterProcessingTime.pastFirstElementInPane()", trigger.toString());
+ }
+
+ @Test
+ public void testWithDelayToString() {
+ Trigger trigger = AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(Duration.standardMinutes(5));
+
+ assertEquals("AfterProcessingTime.pastFirstElementInPane().plusDelayOf(5 minutes)",
+ trigger.toString());
+ }
+
+ @Test
+ public void testBuiltUpToString() {
+ Trigger trigger = AfterWatermark.pastEndOfWindow()
+ .withLateFirings(AfterProcessingTime
+ .pastFirstElementInPane()
+ .plusDelayOf(Duration.standardMinutes(10)));
+
+ String expected = "AfterWatermark.pastEndOfWindow()"
+ + ".withLateFirings(AfterProcessingTime"
+ + ".pastFirstElementInPane()"
+ + ".plusDelayOf(10 minutes))";
+
+ assertEquals(expected, trigger.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTimeTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTimeTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTimeTest.java
new file mode 100644
index 0000000..7e6e938
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTimeTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests the {@link AfterSynchronizedProcessingTime}.
+ */
+@RunWith(JUnit4.class)
+public class AfterSynchronizedProcessingTimeTest {
+
+ private Trigger underTest = new AfterSynchronizedProcessingTime();
+
+ @Test
+ public void testAfterProcessingTimeWithFixedWindows() throws Exception {
+ Duration windowDuration = Duration.millis(10);
+ SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
+ AfterProcessingTime
+ .pastFirstElementInPane()
+ .plusDelayOf(Duration.millis(5)),
+ FixedWindows.of(windowDuration));
+
+ tester.advanceProcessingTime(new Instant(10));
+
+ // Timer at 15
+ tester.injectElements(1);
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(10));
+ tester.advanceProcessingTime(new Instant(12));
+ assertFalse(tester.shouldFire(firstWindow));
+
+ // Load up elements in the next window, timer at 17 for them
+ tester.injectElements(11, 12, 13);
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(20));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ // Not quite time to fire
+ tester.advanceProcessingTime(new Instant(14));
+ assertFalse(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ // Timer at 19 for these in the first window; it should be ignored since the 15 will fire first
+ tester.injectElements(2, 3);
+
+ // Advance past the first timer and fire, finishing the first window
+ tester.advanceProcessingTime(new Instant(16));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ tester.fireIfShouldFire(firstWindow);
+ assertTrue(tester.isMarkedFinished(firstWindow));
+
+ // The next window fires and finishes now
+ tester.advanceProcessingTime(new Instant(18));
+ assertTrue(tester.shouldFire(secondWindow));
+ tester.fireIfShouldFire(secondWindow);
+ assertTrue(tester.isMarkedFinished(secondWindow));
+ }
+
+ @Test
+ public void testAfterProcessingTimeWithMergingWindow() throws Exception {
+ Duration windowDuration = Duration.millis(10);
+ SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
+ AfterProcessingTime
+ .pastFirstElementInPane()
+ .plusDelayOf(Duration.millis(5)),
+ Sessions.withGapDuration(windowDuration));
+
+ tester.advanceProcessingTime(new Instant(10));
+ tester.injectElements(1); // in [1, 11), timer for 15
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
+ assertFalse(tester.shouldFire(firstWindow));
+
+ tester.advanceProcessingTime(new Instant(12));
+ tester.injectElements(3); // in [3, 13), timer for 17
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new Instant(13));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ tester.mergeWindows();
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(13));
+
+ tester.advanceProcessingTime(new Instant(16));
+ assertTrue(tester.shouldFire(mergedWindow));
+ }
+
+ @Test
+ public void testFireDeadline() throws Exception {
+ assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
+ underTest.getWatermarkThatGuaranteesFiring(
+ new IntervalWindow(new Instant(0), new Instant(10))));
+ }
+
+ @Test
+ public void testContinuation() throws Exception {
+ assertEquals(underTest, underTest.getContinuationTrigger());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterWatermarkTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterWatermarkTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterWatermarkTest.java
new file mode 100644
index 0000000..084027b
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterWatermarkTest.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests the {@link AfterWatermark} triggers.
+ */
+@RunWith(JUnit4.class)
+public class AfterWatermarkTest {
+
+ @Mock private OnceTrigger mockEarly;
+ @Mock private OnceTrigger mockLate;
+
+ private SimpleTriggerTester<IntervalWindow> tester;
+ private static Trigger.TriggerContext anyTriggerContext() {
+ return Mockito.<Trigger.TriggerContext>any();
+ }
+ private static Trigger.OnElementContext anyElementContext() {
+ return Mockito.<Trigger.OnElementContext>any();
+ }
+
+ private void injectElements(int... elements) throws Exception {
+ for (int element : elements) {
+ doNothing().when(mockEarly).onElement(anyElementContext());
+ doNothing().when(mockLate).onElement(anyElementContext());
+ tester.injectElements(element);
+ }
+ }
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ public void testRunningAsTrigger(OnceTrigger mockTrigger, IntervalWindow window)
+ throws Exception {
+
+ // Don't fire due to mock saying no
+ when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+ assertFalse(tester.shouldFire(window)); // not ready
+
+ // Fire due to mock trigger; early trigger is required to be a OnceTrigger
+ when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ assertTrue(tester.shouldFire(window)); // ready
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.isMarkedFinished(window));
+ }
+
+ @Test
+ public void testEarlyAndAtWatermark() throws Exception {
+ tester = TriggerTester.forTrigger(
+ AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(mockEarly),
+ FixedWindows.of(Duration.millis(100)));
+
+ injectElements(1);
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
+
+ testRunningAsTrigger(mockEarly, window);
+
+ // Fire due to watermark
+ when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
+ tester.advanceInputWatermark(new Instant(100));
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertTrue(tester.isMarkedFinished(window));
+ }
+
+ @Test
+ public void testAtWatermarkAndLate() throws Exception {
+ tester = TriggerTester.forTrigger(
+ AfterWatermark.pastEndOfWindow()
+ .withLateFirings(mockLate),
+ FixedWindows.of(Duration.millis(100)));
+
+ injectElements(1);
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
+
+ // No early firing, just double checking
+ when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(true);
+ assertFalse(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.isMarkedFinished(window));
+
+ // Fire due to watermark
+ when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
+ tester.advanceInputWatermark(new Instant(100));
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.isMarkedFinished(window));
+
+ testRunningAsTrigger(mockLate, window);
+ }
+
+ @Test
+ public void testEarlyAndAtWatermarkAndLate() throws Exception {
+ tester = TriggerTester.forTrigger(
+ AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(mockEarly)
+ .withLateFirings(mockLate),
+ FixedWindows.of(Duration.millis(100)));
+
+ injectElements(1);
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
+
+ testRunningAsTrigger(mockEarly, window);
+
+ // Fire due to watermark
+ when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
+ tester.advanceInputWatermark(new Instant(100));
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.isMarkedFinished(window));
+
+ testRunningAsTrigger(mockLate, window);
+ }
+
+ /**
+ * Tests that if the EOW is finished in both as well as the merged window, then
+ * it is finished in the merged result.
+ *
+ * <p>Because windows are discarded when a trigger finishes, we need to embed this
+ * in a sequence in order to check that it is re-activated. So this test is potentially
+ * sensitive to other triggers' correctness.
+ */
+ @Test
+ public void testOnMergeAlreadyFinished() throws Exception {
+ tester = TriggerTester.forTrigger(
+ AfterEach.inOrder(
+ AfterWatermark.pastEndOfWindow(),
+ Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
+ Sessions.withGapDuration(Duration.millis(10)));
+
+ tester.injectElements(1);
+ tester.injectElements(5);
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
+
+ // Finish the AfterWatermark.pastEndOfWindow() trigger in both windows
+ tester.advanceInputWatermark(new Instant(15));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertTrue(tester.shouldFire(secondWindow));
+ tester.fireIfShouldFire(firstWindow);
+ tester.fireIfShouldFire(secondWindow);
+
+ // Confirm that we are on the second trigger by probing
+ assertFalse(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ tester.injectElements(1);
+ tester.injectElements(5);
+ assertTrue(tester.shouldFire(firstWindow));
+ assertTrue(tester.shouldFire(secondWindow));
+ tester.fireIfShouldFire(firstWindow);
+ tester.fireIfShouldFire(secondWindow);
+
+ // Merging should leave it finished
+ tester.mergeWindows();
+
+ // Confirm that we are on the second trigger by probing
+ assertFalse(tester.shouldFire(mergedWindow));
+ tester.injectElements(1);
+ assertTrue(tester.shouldFire(mergedWindow));
+ }
+
+ /**
+ * Tests that the trigger rewinds to be non-finished in the merged window.
+ *
+ * <p>Because windows are discarded when a trigger finishes, we need to embed this
+ * in a sequence in order to check that it is re-activated. So this test is potentially
+ * sensitive to other triggers' correctness.
+ */
+ @Test
+ public void testOnMergeRewinds() throws Exception {
+ tester = TriggerTester.forTrigger(
+ AfterEach.inOrder(
+ AfterWatermark.pastEndOfWindow(),
+ Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
+ Sessions.withGapDuration(Duration.millis(10)));
+
+ tester.injectElements(1);
+ tester.injectElements(5);
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
+
+ // Finish the AfterWatermark.pastEndOfWindow() trigger in only the first window
+ tester.advanceInputWatermark(new Instant(11));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ tester.fireIfShouldFire(firstWindow);
+
+ // Confirm that we are on the second trigger by probing
+ assertFalse(tester.shouldFire(firstWindow));
+ tester.injectElements(1);
+ assertTrue(tester.shouldFire(firstWindow));
+ tester.fireIfShouldFire(firstWindow);
+
+ // Merging should re-activate the watermark trigger in the merged window
+ tester.mergeWindows();
+
+ // Confirm that we are not on the second trigger by probing
+ assertFalse(tester.shouldFire(mergedWindow));
+ tester.injectElements(1);
+ assertFalse(tester.shouldFire(mergedWindow));
+
+ // And confirm that advancing the watermark fires again
+ tester.advanceInputWatermark(new Instant(15));
+ assertTrue(tester.shouldFire(mergedWindow));
+ }
+
+ /**
+ * Tests that if the EOW is finished in both as well as the merged window, then
+ * it is finished in the merged result.
+ *
+ * <p>Because windows are discarded when a trigger finishes, we need to embed this
+ * in a sequence in order to check that it is re-activated. So this test is potentially
+ * sensitive to other triggers' correctness.
+ */
+ @Test
+ public void testEarlyAndLateOnMergeAlreadyFinished() throws Exception {
+ tester = TriggerTester.forTrigger(
+ AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(AfterPane.elementCountAtLeast(100))
+ .withLateFirings(AfterPane.elementCountAtLeast(1)),
+ Sessions.withGapDuration(Duration.millis(10)));
+
+ tester.injectElements(1);
+ tester.injectElements(5);
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
+
+ // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in both windows
+ tester.advanceInputWatermark(new Instant(15));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertTrue(tester.shouldFire(secondWindow));
+ tester.fireIfShouldFire(firstWindow);
+ tester.fireIfShouldFire(secondWindow);
+
+ // Confirm that we are on the late trigger by probing
+ assertFalse(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ tester.injectElements(1);
+ tester.injectElements(5);
+ assertTrue(tester.shouldFire(firstWindow));
+ assertTrue(tester.shouldFire(secondWindow));
+ tester.fireIfShouldFire(firstWindow);
+ tester.fireIfShouldFire(secondWindow);
+
+ // Merging should leave it on the late trigger
+ tester.mergeWindows();
+
+ // Confirm that we are on the late trigger by probing
+ assertFalse(tester.shouldFire(mergedWindow));
+ tester.injectElements(1);
+ assertTrue(tester.shouldFire(mergedWindow));
+ }
+
+ /**
+ * Tests that the trigger rewinds to be non-finished in the merged window.
+ *
+ * <p>Because windows are discarded when a trigger finishes, we need to embed this
+ * in a sequence in order to check that it is re-activated. So this test is potentially
+ * sensitive to other triggers' correctness.
+ */
+ @Test
+ public void testEarlyAndLateOnMergeRewinds() throws Exception {
+ tester = TriggerTester.forTrigger(
+ AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(AfterPane.elementCountAtLeast(100))
+ .withLateFirings(AfterPane.elementCountAtLeast(1)),
+ Sessions.withGapDuration(Duration.millis(10)));
+
+ tester.injectElements(1);
+ tester.injectElements(5);
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
+
+ // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in only the first window
+ tester.advanceInputWatermark(new Instant(11));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ tester.fireIfShouldFire(firstWindow);
+
+ // Confirm that we are on the late trigger by probing
+ assertFalse(tester.shouldFire(firstWindow));
+ tester.injectElements(1);
+ assertTrue(tester.shouldFire(firstWindow));
+ tester.fireIfShouldFire(firstWindow);
+
+ // Merging should re-activate the early trigger in the merged window
+ tester.mergeWindows();
+
+ // Confirm that we are not on the second trigger by probing
+ assertFalse(tester.shouldFire(mergedWindow));
+ tester.injectElements(1);
+ assertFalse(tester.shouldFire(mergedWindow));
+
+ // And confirm that advancing the watermark fires again
+ tester.advanceInputWatermark(new Instant(15));
+ assertTrue(tester.shouldFire(mergedWindow));
+ }
+
+ @Test
+ public void testFromEndOfWindowToString() {
+ Trigger trigger = AfterWatermark.pastEndOfWindow();
+ assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString());
+ }
+
+ @Test
+ public void testEarlyFiringsToString() {
+ Trigger trigger = AfterWatermark.pastEndOfWindow().withEarlyFirings(StubTrigger.named("t1"));
+
+ assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1)", trigger.toString());
+ }
+
+ @Test
+ public void testLateFiringsToString() {
+ Trigger trigger = AfterWatermark.pastEndOfWindow().withLateFirings(StubTrigger.named("t1"));
+
+ assertEquals("AfterWatermark.pastEndOfWindow().withLateFirings(t1)", trigger.toString());
+ }
+
+ @Test
+ public void testEarlyAndLateFiringsToString() {
+ Trigger trigger =
+ AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(StubTrigger.named("t1"))
+ .withLateFirings(StubTrigger.named("t2"));
+
+ assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1).withLateFirings(t2)",
+ trigger.toString());
+ }
+
+ @Test
+ public void testToStringExcludesNeverTrigger() {
+ Trigger trigger =
+ AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(Never.ever())
+ .withLateFirings(Never.ever());
+
+ assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/DefaultTriggerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/DefaultTriggerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/DefaultTriggerTest.java
new file mode 100644
index 0000000..673e555
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/DefaultTriggerTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests the {@link DefaultTrigger}, which should be equivalent to
+ * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
+ */
+@RunWith(JUnit4.class)
+public class DefaultTriggerTest {
+
+ SimpleTriggerTester<IntervalWindow> tester;
+
+ @Test
+ public void testDefaultTriggerFixedWindows() throws Exception {
+ tester = TriggerTester.forTrigger(
+ DefaultTrigger.of(),
+ FixedWindows.of(Duration.millis(100)));
+
+ tester.injectElements(
+ 1, // [0, 100)
+ 101); // [100, 200)
+
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(100));
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(100), new Instant(200));
+
+ // Advance the watermark almost to the end of the first window.
+ tester.advanceInputWatermark(new Instant(99));
+ assertFalse(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ // Advance watermark past end of the first window, which is then ready
+ tester.advanceInputWatermark(new Instant(100));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ // Fire, but the first window is still allowed to fire
+ tester.fireIfShouldFire(firstWindow);
+ assertTrue(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ // Advance watermark to 200, then both are ready
+ tester.advanceInputWatermark(new Instant(200));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertTrue(tester.shouldFire(secondWindow));
+
+ assertFalse(tester.isMarkedFinished(firstWindow));
+ assertFalse(tester.isMarkedFinished(secondWindow));
+ }
+
+ @Test
+ public void testDefaultTriggerSlidingWindows() throws Exception {
+ tester = TriggerTester.forTrigger(
+ DefaultTrigger.of(),
+ SlidingWindows.of(Duration.millis(100)).every(Duration.millis(50)));
+
+ tester.injectElements(
+ 1, // [-50, 50), [0, 100)
+ 50); // [0, 100), [50, 150)
+
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(-50), new Instant(50));
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new Instant(100));
+ IntervalWindow thirdWindow = new IntervalWindow(new Instant(50), new Instant(150));
+
+ assertFalse(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ assertFalse(tester.shouldFire(thirdWindow));
+
+ // At 50, the first becomes ready; it stays ready after firing
+ tester.advanceInputWatermark(new Instant(50));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ assertFalse(tester.shouldFire(thirdWindow));
+ tester.fireIfShouldFire(firstWindow);
+ assertTrue(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ assertFalse(tester.shouldFire(thirdWindow));
+
+ // At 99, the first is still the only one ready
+ tester.advanceInputWatermark(new Instant(99));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ assertFalse(tester.shouldFire(thirdWindow));
+
+ // At 100, the first and second are ready
+ tester.advanceInputWatermark(new Instant(100));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertTrue(tester.shouldFire(secondWindow));
+ assertFalse(tester.shouldFire(thirdWindow));
+ tester.fireIfShouldFire(firstWindow);
+
+ assertFalse(tester.isMarkedFinished(firstWindow));
+ assertFalse(tester.isMarkedFinished(secondWindow));
+ assertFalse(tester.isMarkedFinished(thirdWindow));
+ }
+
+ @Test
+ public void testDefaultTriggerSessions() throws Exception {
+ tester = TriggerTester.forTrigger(
+ DefaultTrigger.of(),
+ Sessions.withGapDuration(Duration.millis(100)));
+
+ tester.injectElements(
+ 1, // [1, 101)
+ 50); // [50, 150)
+ tester.mergeWindows();
+
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(101));
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(50), new Instant(150));
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(150));
+
+ // Not ready in any window yet
+ tester.advanceInputWatermark(new Instant(100));
+ assertFalse(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ assertFalse(tester.shouldFire(mergedWindow));
+
+ // The first window is "ready": the caller owns knowledge of which windows are merged away
+ tester.advanceInputWatermark(new Instant(149));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ assertFalse(tester.shouldFire(mergedWindow));
+
+ // Now ready on all windows
+ tester.advanceInputWatermark(new Instant(150));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertTrue(tester.shouldFire(secondWindow));
+ assertTrue(tester.shouldFire(mergedWindow));
+
+ // Ensure it repeats
+ tester.fireIfShouldFire(mergedWindow);
+ assertTrue(tester.shouldFire(mergedWindow));
+
+ assertFalse(tester.isMarkedFinished(mergedWindow));
+ }
+
+ @Test
+ public void testFireDeadline() throws Exception {
+ assertEquals(new Instant(9), DefaultTrigger.of().getWatermarkThatGuaranteesFiring(
+ new IntervalWindow(new Instant(0), new Instant(10))));
+ assertEquals(GlobalWindow.INSTANCE.maxTimestamp(),
+ DefaultTrigger.of().getWatermarkThatGuaranteesFiring(GlobalWindow.INSTANCE));
+ }
+
+ @Test
+ public void testContinuation() throws Exception {
+ assertEquals(DefaultTrigger.of(), DefaultTrigger.of().getContinuationTrigger());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ExecutableTriggerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ExecutableTriggerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ExecutableTriggerTest.java
new file mode 100644
index 0000000..1e3a1ff
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ExecutableTriggerTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link ExecutableTrigger}.
+ */
+@RunWith(JUnit4.class)
+public class ExecutableTriggerTest {
+
+ @Test
+ public void testIndexAssignmentLeaf() throws Exception {
+ StubTrigger t1 = new StubTrigger();
+ ExecutableTrigger executable = ExecutableTrigger.create(t1);
+ assertEquals(0, executable.getTriggerIndex());
+ }
+
+ @Test
+ public void testIndexAssignmentOneLevel() throws Exception {
+ StubTrigger t1 = new StubTrigger();
+ StubTrigger t2 = new StubTrigger();
+ StubTrigger t = new StubTrigger(t1, t2);
+
+ ExecutableTrigger executable = ExecutableTrigger.create(t);
+
+ assertEquals(0, executable.getTriggerIndex());
+ assertEquals(1, executable.subTriggers().get(0).getTriggerIndex());
+ assertSame(t1, executable.subTriggers().get(0).getSpec());
+ assertEquals(2, executable.subTriggers().get(1).getTriggerIndex());
+ assertSame(t2, executable.subTriggers().get(1).getSpec());
+ }
+
+ @Test
+ public void testIndexAssignmentTwoLevel() throws Exception {
+ StubTrigger t11 = new StubTrigger();
+ StubTrigger t12 = new StubTrigger();
+ StubTrigger t13 = new StubTrigger();
+ StubTrigger t14 = new StubTrigger();
+ StubTrigger t21 = new StubTrigger();
+ StubTrigger t22 = new StubTrigger();
+ StubTrigger t1 = new StubTrigger(t11, t12, t13, t14);
+ StubTrigger t2 = new StubTrigger(t21, t22);
+ StubTrigger t = new StubTrigger(t1, t2);
+
+ ExecutableTrigger executable = ExecutableTrigger.create(t);
+
+ assertEquals(0, executable.getTriggerIndex());
+ assertEquals(1, executable.subTriggers().get(0).getTriggerIndex());
+ assertEquals(6, executable.subTriggers().get(0).getFirstIndexAfterSubtree());
+ assertEquals(6, executable.subTriggers().get(1).getTriggerIndex());
+
+ assertSame(t1, executable.getSubTriggerContaining(1).getSpec());
+ assertSame(t2, executable.getSubTriggerContaining(6).getSpec());
+ assertSame(t1, executable.getSubTriggerContaining(2).getSpec());
+ assertSame(t1, executable.getSubTriggerContaining(3).getSpec());
+ assertSame(t1, executable.getSubTriggerContaining(5).getSpec());
+ assertSame(t2, executable.getSubTriggerContaining(7).getSpec());
+ }
+
+ private static class StubTrigger extends Trigger {
+
+ @SafeVarargs
+ protected StubTrigger(Trigger... subTriggers) {
+ super(Arrays.asList(subTriggers));
+ }
+
+ @Override
+ public void onElement(OnElementContext c) throws Exception { }
+
+ @Override
+ public void onMerge(OnMergeContext c) throws Exception { }
+
+ @Override
+ public void clear(TriggerContext c) throws Exception {
+ }
+
+ @Override
+ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+ return BoundedWindow.TIMESTAMP_MAX_VALUE;
+ }
+
+ @Override
+ public boolean isCompatible(Trigger other) {
+ return false;
+ }
+
+ @Override
+ public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+ return this;
+ }
+
+ @Override
+ public boolean shouldFire(TriggerContext c) {
+ return false;
+ }
+
+ @Override
+ public void onFire(TriggerContext c) { }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSetTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSetTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSetTest.java
new file mode 100644
index 0000000..7f74620
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSetTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.theInstance;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link FinishedTriggersBitSet}.
+ */
+@RunWith(JUnit4.class)
+public class FinishedTriggersBitSetTest {
+ /**
+ * Tests that after a trigger is set to finished, it reads back as finished.
+ */
+ @Test
+ public void testSetGet() {
+ FinishedTriggersProperties.verifyGetAfterSet(FinishedTriggersBitSet.emptyWithCapacity(1));
+ }
+
+ /**
+ * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no
+ * others.
+ */
+ @Test
+ public void testClearRecursively() {
+ FinishedTriggersProperties.verifyClearRecursively(FinishedTriggersBitSet.emptyWithCapacity(1));
+ }
+
+ @Test
+ public void testCopy() throws Exception {
+ FinishedTriggersBitSet finishedSet = FinishedTriggersBitSet.emptyWithCapacity(10);
+ assertThat(finishedSet.copy().getBitSet(), not(theInstance(finishedSet.getBitSet())));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersProperties.java
new file mode 100644
index 0000000..a66f74f
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersProperties.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.transforms.windowing.AfterAll;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+
+/**
+ * Generalized tests for {@link FinishedTriggers} implementations.
+ */
+public class FinishedTriggersProperties {
+ /**
+ * Tests that for the provided trigger and {@link FinishedTriggers}, when the trigger is set
+ * finished, it is correctly reported as finished.
+ */
+ public static void verifyGetAfterSet(FinishedTriggers finishedSet, ExecutableTrigger trigger) {
+ assertFalse(finishedSet.isFinished(trigger));
+ finishedSet.setFinished(trigger, true);
+ assertTrue(finishedSet.isFinished(trigger));
+ }
+
+ /**
+ * For a few arbitrary triggers, tests that when the trigger is set finished it is correctly
+ * reported as finished.
+ */
+ public static void verifyGetAfterSet(FinishedTriggers finishedSet) {
+ ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of(
+ AfterFirst.of(AfterPane.elementCountAtLeast(3), AfterWatermark.pastEndOfWindow()),
+ AfterAll.of(
+ AfterPane.elementCountAtLeast(10), AfterProcessingTime.pastFirstElementInPane())));
+
+ verifyGetAfterSet(finishedSet, trigger);
+ verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0).subTriggers().get(1));
+ verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0));
+ verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1));
+ verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1).subTriggers().get(1));
+ verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1).subTriggers().get(0));
+ }
+
+ /**
+ * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no
+ * others.
+ */
+ public static void verifyClearRecursively(FinishedTriggers finishedSet) {
+ ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of(
+ AfterFirst.of(AfterPane.elementCountAtLeast(3), AfterWatermark.pastEndOfWindow()),
+ AfterAll.of(
+ AfterPane.elementCountAtLeast(10), AfterProcessingTime.pastFirstElementInPane())));
+
+ // Set them all finished. This method is not on a trigger as it makes no sense outside tests.
+ setFinishedRecursively(finishedSet, trigger);
+ assertTrue(finishedSet.isFinished(trigger));
+ assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0)));
+ assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(0)));
+ assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(1)));
+
+ // Clear just the second AfterAll
+ finishedSet.clearRecursively(trigger.subTriggers().get(1));
+
+ // Check that the first and all that are still finished
+ assertTrue(finishedSet.isFinished(trigger));
+ verifyFinishedRecursively(finishedSet, trigger.subTriggers().get(0));
+ verifyUnfinishedRecursively(finishedSet, trigger.subTriggers().get(1));
+ }
+
+ private static void setFinishedRecursively(
+ FinishedTriggers finishedSet, ExecutableTrigger trigger) {
+ finishedSet.setFinished(trigger, true);
+ for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
+ setFinishedRecursively(finishedSet, subTrigger);
+ }
+ }
+
+ private static void verifyFinishedRecursively(
+ FinishedTriggers finishedSet, ExecutableTrigger trigger) {
+ assertTrue(finishedSet.isFinished(trigger));
+ for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
+ verifyFinishedRecursively(finishedSet, subTrigger);
+ }
+ }
+
+ private static void verifyUnfinishedRecursively(
+ FinishedTriggers finishedSet, ExecutableTrigger trigger) {
+ assertFalse(finishedSet.isFinished(trigger));
+ for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
+ verifyUnfinishedRecursively(finishedSet, subTrigger);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersSetTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersSetTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersSetTest.java
new file mode 100644
index 0000000..072d264
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersSetTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.theInstance;
+import static org.junit.Assert.assertThat;
+
+import java.util.HashSet;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link FinishedTriggersSet}.
+ */
+@RunWith(JUnit4.class)
+public class FinishedTriggersSetTest {
+ /**
+ * Tests that after a trigger is set to finished, it reads back as finished.
+ */
+ @Test
+ public void testSetGet() {
+ FinishedTriggersProperties.verifyGetAfterSet(
+ FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>()));
+ }
+
+ /**
+ * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no
+ * others.
+ */
+ @Test
+ public void testClearRecursively() {
+ FinishedTriggersProperties.verifyClearRecursively(
+ FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>()));
+ }
+
+ @Test
+ public void testCopy() throws Exception {
+ FinishedTriggersSet finishedSet =
+ FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>());
+ assertThat(finishedSet.copy().getFinishedTriggers(),
+ not(theInstance(finishedSet.getFinishedTriggers())));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/NeverTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/NeverTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/NeverTest.java
new file mode 100644
index 0000000..fb2b4d5
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/NeverTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link Never}.
+ */
+@RunWith(JUnit4.class)
+public class NeverTest {
+ private SimpleTriggerTester<IntervalWindow> triggerTester;
+
+ @Before
+ public void setup() throws Exception {
+ triggerTester =
+ TriggerTester.forTrigger(
+ Never.ever(), FixedWindows.of(Duration.standardMinutes(5)));
+ }
+
+ @Test
+ public void falseAfterEndOfWindow() throws Exception {
+ triggerTester.injectElements(TimestampedValue.of(1, new Instant(1)));
+ IntervalWindow window =
+ new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardMinutes(5)));
+ assertThat(triggerTester.shouldFire(window), is(false));
+ triggerTester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ assertThat(triggerTester.shouldFire(window), is(false));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/OrFinallyTriggerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/OrFinallyTriggerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/OrFinallyTriggerTest.java
new file mode 100644
index 0000000..7289d97
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/OrFinallyTriggerTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link OrFinallyTrigger}.
+ */
+@RunWith(JUnit4.class)
+public class OrFinallyTriggerTest {
+
+ private SimpleTriggerTester<IntervalWindow> tester;
+
+ /**
+ * Tests that for {@code OrFinally(actual, ...)} when {@code actual}
+ * fires and finishes, the {@code OrFinally} also fires and finishes.
+ */
+ @Test
+ public void testActualFiresAndFinishes() throws Exception {
+ tester = TriggerTester.forTrigger(
+ new OrFinallyTrigger(
+ AfterPane.elementCountAtLeast(2),
+ AfterPane.elementCountAtLeast(100)),
+ FixedWindows.of(Duration.millis(100)));
+
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
+
+ // Not yet firing
+ tester.injectElements(1);
+ assertFalse(tester.shouldFire(window));
+ assertFalse(tester.isMarkedFinished(window));
+
+ // The actual fires and finishes
+ tester.injectElements(2);
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertTrue(tester.isMarkedFinished(window));
+ }
+
+ /**
+ * Tests that for {@code OrFinally(actual, ...)} when {@code actual}
+ * fires but does not finish, the {@code OrFinally} also fires and also does not
+ * finish.
+ */
+ @Test
+ public void testActualFiresOnly() throws Exception {
+ tester = TriggerTester.forTrigger(
+ new OrFinallyTrigger(
+ Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
+ AfterPane.elementCountAtLeast(100)),
+ FixedWindows.of(Duration.millis(100)));
+
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
+
+ // Not yet firing
+ tester.injectElements(1);
+ assertFalse(tester.shouldFire(window));
+ assertFalse(tester.isMarkedFinished(window));
+
+ // The actual fires but does not finish
+ tester.injectElements(2);
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.isMarkedFinished(window));
+
+ // And again
+ tester.injectElements(3, 4);
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.isMarkedFinished(window));
+ }
+
+ /**
+ * Tests that if the first trigger rewinds to be non-finished in the merged window,
+ * then it becomes the currently active trigger again, with real triggers.
+ */
+ @Test
+ public void testShouldFireAfterMerge() throws Exception {
+ tester = TriggerTester.forTrigger(
+ AfterEach.inOrder(
+ AfterPane.elementCountAtLeast(5)
+ .orFinally(AfterWatermark.pastEndOfWindow()),
+ Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
+ Sessions.withGapDuration(Duration.millis(10)));
+
+ // Finished the orFinally in the first window
+ tester.injectElements(1);
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
+ assertFalse(tester.shouldFire(firstWindow));
+ tester.advanceInputWatermark(new Instant(11));
+ assertTrue(tester.shouldFire(firstWindow));
+ tester.fireIfShouldFire(firstWindow);
+
+ // Set up second window where it is not done
+ tester.injectElements(5);
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ // Merge them, if the merged window were on the second trigger, it would be ready
+ tester.mergeWindows();
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
+ assertFalse(tester.shouldFire(mergedWindow));
+
+ // Now adding 3 more makes the main trigger ready to fire
+ tester.injectElements(1, 2, 3, 4, 5);
+ tester.mergeWindows();
+ assertTrue(tester.shouldFire(mergedWindow));
+ }
+
+ /**
+ * Tests that for {@code OrFinally(actual, until)} when {@code actual}
+ * fires but does not finish, then {@code until} fires and finishes, the
+ * whole thing fires and finished.
+ */
+ @Test
+ public void testActualFiresButUntilFinishes() throws Exception {
+ tester = TriggerTester.forTrigger(
+ new OrFinallyTrigger(
+ Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
+ AfterPane.elementCountAtLeast(3)),
+ FixedWindows.of(Duration.millis(10)));
+
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
+
+ // Before any firing
+ tester.injectElements(1);
+ assertFalse(tester.shouldFire(window));
+ assertFalse(tester.isMarkedFinished(window));
+
+ // The actual fires but doesn't finish
+ tester.injectElements(2);
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.isMarkedFinished(window));
+
+ // The until fires and finishes; the trigger is finished
+ tester.injectElements(3);
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertTrue(tester.isMarkedFinished(window));
+ }
+
+ @Test
+ public void testFireDeadline() throws Exception {
+ BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10));
+
+ assertEquals(new Instant(9),
+ Repeatedly.forever(AfterWatermark.pastEndOfWindow())
+ .getWatermarkThatGuaranteesFiring(window));
+ assertEquals(new Instant(9), Repeatedly.forever(AfterWatermark.pastEndOfWindow())
+ .orFinally(AfterPane.elementCountAtLeast(1))
+ .getWatermarkThatGuaranteesFiring(window));
+ assertEquals(new Instant(9), Repeatedly.forever(AfterPane.elementCountAtLeast(1))
+ .orFinally(AfterWatermark.pastEndOfWindow())
+ .getWatermarkThatGuaranteesFiring(window));
+ assertEquals(new Instant(9),
+ AfterPane.elementCountAtLeast(100)
+ .orFinally(AfterWatermark.pastEndOfWindow())
+ .getWatermarkThatGuaranteesFiring(window));
+
+ assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
+ Repeatedly.forever(AfterPane.elementCountAtLeast(1))
+ .orFinally(AfterPane.elementCountAtLeast(10))
+ .getWatermarkThatGuaranteesFiring(window));
+ }
+
+ @Test
+ public void testContinuation() throws Exception {
+ OnceTrigger triggerA = AfterProcessingTime.pastFirstElementInPane();
+ OnceTrigger triggerB = AfterWatermark.pastEndOfWindow();
+ Trigger aOrFinallyB = triggerA.orFinally(triggerB);
+ Trigger bOrFinallyA = triggerB.orFinally(triggerA);
+ assertEquals(
+ Repeatedly.forever(
+ triggerA.getContinuationTrigger().orFinally(triggerB.getContinuationTrigger())),
+ aOrFinallyB.getContinuationTrigger());
+ assertEquals(
+ Repeatedly.forever(
+ triggerB.getContinuationTrigger().orFinally(triggerA.getContinuationTrigger())),
+ bOrFinallyA.getContinuationTrigger());
+ }
+
+ @Test
+ public void testToString() {
+ Trigger trigger = StubTrigger.named("t1").orFinally(StubTrigger.named("t2"));
+ assertEquals("t1.orFinally(t2)", trigger.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/RepeatedlyTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/RepeatedlyTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/RepeatedlyTest.java
new file mode 100644
index 0000000..6e8930d
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/RepeatedlyTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link Repeatedly}.
+ */
+@RunWith(JUnit4.class)
+public class RepeatedlyTest {
+
+ @Mock private Trigger mockTrigger;
+ private SimpleTriggerTester<IntervalWindow> tester;
+ private static Trigger.TriggerContext anyTriggerContext() {
+ return Mockito.<Trigger.TriggerContext>any();
+ }
+
+ public void setUp(WindowFn<Object, IntervalWindow> windowFn) throws Exception {
+ MockitoAnnotations.initMocks(this);
+ tester = TriggerTester.forTrigger(Repeatedly.forever(mockTrigger), windowFn);
+ }
+
+ /**
+ * Tests that onElement correctly passes the data on to the subtrigger.
+ */
+ @Test
+ public void testOnElement() throws Exception {
+ setUp(FixedWindows.of(Duration.millis(10)));
+ tester.injectElements(37);
+ verify(mockTrigger).onElement(Mockito.<Trigger.OnElementContext>any());
+ }
+
+ /**
+ * Tests that the repeatedly is ready to fire whenever the subtrigger is ready.
+ */
+ @Test
+ public void testShouldFire() throws Exception {
+ setUp(FixedWindows.of(Duration.millis(10)));
+
+ when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ assertTrue(tester.shouldFire(new IntervalWindow(new Instant(0), new Instant(10))));
+
+ when(mockTrigger.shouldFire(Mockito.<Trigger.TriggerContext>any()))
+ .thenReturn(false);
+ assertFalse(tester.shouldFire(new IntervalWindow(new Instant(0), new Instant(10))));
+ }
+
+ /**
+ * Tests that the watermark that guarantees firing is that of the subtrigger.
+ */
+ @Test
+ public void testFireDeadline() throws Exception {
+ setUp(FixedWindows.of(Duration.millis(10)));
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
+ Instant arbitraryInstant = new Instant(34957849);
+
+ when(mockTrigger.getWatermarkThatGuaranteesFiring(Mockito.<IntervalWindow>any()))
+ .thenReturn(arbitraryInstant);
+
+ assertThat(
+ Repeatedly.forever(mockTrigger).getWatermarkThatGuaranteesFiring(window),
+ equalTo(arbitraryInstant));
+ }
+
+ @Test
+ public void testContinuation() throws Exception {
+ Trigger trigger = AfterProcessingTime.pastFirstElementInPane();
+ Trigger repeatedly = Repeatedly.forever(trigger);
+ assertEquals(
+ Repeatedly.forever(trigger.getContinuationTrigger()), repeatedly.getContinuationTrigger());
+ assertEquals(
+ Repeatedly.forever(trigger.getContinuationTrigger().getContinuationTrigger()),
+ repeatedly.getContinuationTrigger().getContinuationTrigger());
+ }
+
+ @Test
+ public void testShouldFireAfterMerge() throws Exception {
+ tester = TriggerTester.forTrigger(
+ Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
+ Sessions.withGapDuration(Duration.millis(10)));
+
+ tester.injectElements(1);
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
+ assertFalse(tester.shouldFire(firstWindow));
+
+ tester.injectElements(5);
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ // Merge them, if the merged window were on the second trigger, it would be ready
+ tester.mergeWindows();
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
+ assertTrue(tester.shouldFire(mergedWindow));
+ }
+
+ @Test
+ public void testRepeatedlyAfterFirstElementCount() throws Exception {
+ SimpleTriggerTester<GlobalWindow> tester =
+ TriggerTester.forTrigger(
+ Repeatedly.forever(
+ AfterFirst.of(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(Duration.standardMinutes(15)),
+ AfterPane.elementCountAtLeast(5))),
+ new GlobalWindows());
+
+ GlobalWindow window = GlobalWindow.INSTANCE;
+
+ tester.injectElements(1);
+ assertFalse(tester.shouldFire(window));
+
+ tester.injectElements(2, 3, 4, 5);
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.shouldFire(window));
+ }
+
+ @Test
+ public void testRepeatedlyAfterFirstProcessingTime() throws Exception {
+ SimpleTriggerTester<GlobalWindow> tester =
+ TriggerTester.forTrigger(
+ Repeatedly.forever(
+ AfterFirst.of(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(Duration.standardMinutes(15)),
+ AfterPane.elementCountAtLeast(5))),
+ new GlobalWindows());
+
+ GlobalWindow window = GlobalWindow.INSTANCE;
+
+ tester.injectElements(1);
+ assertFalse(tester.shouldFire(window));
+
+ tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15)));
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.shouldFire(window));
+ }
+
+ @Test
+ public void testRepeatedlyElementCount() throws Exception {
+ SimpleTriggerTester<GlobalWindow> tester =
+ TriggerTester.forTrigger(
+ Repeatedly.forever(AfterPane.elementCountAtLeast(5)),
+ new GlobalWindows());
+
+ GlobalWindow window = GlobalWindow.INSTANCE;
+
+ tester.injectElements(1);
+ assertFalse(tester.shouldFire(window));
+
+ tester.injectElements(2, 3, 4, 5);
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.shouldFire(window));
+ }
+
+ @Test
+ public void testRepeatedlyProcessingTime() throws Exception {
+ SimpleTriggerTester<GlobalWindow> tester =
+ TriggerTester.forTrigger(
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(Duration.standardMinutes(15))),
+ new GlobalWindows());
+
+ GlobalWindow window = GlobalWindow.INSTANCE;
+
+ tester.injectElements(1);
+ assertFalse(tester.shouldFire(window));
+
+ tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15)));
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.shouldFire(window));
+ }
+
+
+ @Test
+ public void testToString() {
+ Trigger trigger = Repeatedly.forever(new StubTrigger() {
+ @Override
+ public String toString() {
+ return "innerTrigger";
+ }
+ });
+
+ assertEquals("Repeatedly.forever(innerTrigger)", trigger.toString());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ReshuffleTriggerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ReshuffleTriggerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ReshuffleTriggerTest.java
new file mode 100644
index 0000000..83077f4
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ReshuffleTriggerTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link ReshuffleTrigger}.
+ */
+@RunWith(JUnit4.class)
+public class ReshuffleTriggerTest {
+
+ /** Public so that other tests can instantiate {@link ReshuffleTrigger}. */
+ public static <W extends BoundedWindow> ReshuffleTrigger<W> forTest() {
+ return new ReshuffleTrigger<>();
+ }
+
+ @Test
+ public void testShouldFire() throws Exception {
+ TriggerTester<Integer, IntervalWindow> tester = TriggerTester.forTrigger(
+ new ReshuffleTrigger<IntervalWindow>(), FixedWindows.of(Duration.millis(100)));
+ IntervalWindow arbitraryWindow = new IntervalWindow(new Instant(300), new Instant(400));
+ assertTrue(tester.shouldFire(arbitraryWindow));
+ }
+
+ @Test
+ public void testOnTimer() throws Exception {
+ TriggerTester<Integer, IntervalWindow> tester = TriggerTester.forTrigger(
+ new ReshuffleTrigger<IntervalWindow>(), FixedWindows.of(Duration.millis(100)));
+ IntervalWindow arbitraryWindow = new IntervalWindow(new Instant(100), new Instant(200));
+ tester.fireIfShouldFire(arbitraryWindow);
+ assertFalse(tester.isMarkedFinished(arbitraryWindow));
+ }
+
+ @Test
+ public void testToString() {
+ Trigger trigger = new ReshuffleTrigger<>();
+ assertEquals("ReshuffleTrigger()", trigger.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/StubTrigger.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/StubTrigger.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/StubTrigger.java
new file mode 100644
index 0000000..b258a79
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/StubTrigger.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.joda.time.Instant;
+
+/**
+ * No-op {@link OnceTrigger} implementation for testing.
+ */
+abstract class StubTrigger extends Trigger.OnceTrigger {
+ /**
+ * Create a stub {@link Trigger} instance which returns the specified name on {@link #toString()}.
+ */
+ static StubTrigger named(final String name) {
+ return new StubTrigger() {
+ @Override
+ public String toString() {
+ return name;
+ }
+ };
+ }
+
+ protected StubTrigger() {
+ super(Lists.<Trigger>newArrayList());
+ }
+
+ @Override
+ protected void onOnlyFiring(TriggerContext context) throws Exception {
+ }
+
+ @Override
+ public void onElement(OnElementContext c) throws Exception {
+ }
+
+ @Override
+ public void onMerge(OnMergeContext c) throws Exception {
+ }
+
+ @Override
+ public boolean shouldFire(TriggerContext context) throws Exception {
+ return false;
+ }
+
+ @Override
+ protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+ return null;
+ }
+
+ @Override
+ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTest.java
new file mode 100644
index 0000000..cfc03b2
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.List;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link Trigger}.
+ */
+@RunWith(JUnit4.class)
+public class TriggerTest {
+
+ @Test
+ public void testTriggerToString() throws Exception {
+ assertEquals("AfterWatermark.pastEndOfWindow()", AfterWatermark.pastEndOfWindow().toString());
+ assertEquals("Repeatedly.forever(AfterWatermark.pastEndOfWindow())",
+ Repeatedly.forever(AfterWatermark.pastEndOfWindow()).toString());
+ }
+
+ @Test
+ public void testIsCompatible() throws Exception {
+ assertTrue(new Trigger1(null).isCompatible(new Trigger1(null)));
+ assertTrue(new Trigger1(Arrays.<Trigger>asList(new Trigger2(null)))
+ .isCompatible(new Trigger1(Arrays.<Trigger>asList(new Trigger2(null)))));
+
+ assertFalse(new Trigger1(null).isCompatible(new Trigger2(null)));
+ assertFalse(new Trigger1(Arrays.<Trigger>asList(new Trigger1(null)))
+ .isCompatible(new Trigger1(Arrays.<Trigger>asList(new Trigger2(null)))));
+ }
+
+ private static class Trigger1 extends Trigger {
+
+ private Trigger1(List<Trigger> subTriggers) {
+ super(subTriggers);
+ }
+
+ @Override
+ public void onElement(Trigger.OnElementContext c) { }
+
+ @Override
+ public void onMerge(Trigger.OnMergeContext c) { }
+
+ @Override
+ protected Trigger getContinuationTrigger(
+ List<Trigger> continuationTriggers) {
+ return null;
+ }
+
+ @Override
+ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+ return null;
+ }
+
+ @Override
+ public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
+ return false;
+ }
+
+ @Override
+ public void onFire(Trigger.TriggerContext context) throws Exception { }
+ }
+
+ private static class Trigger2 extends Trigger {
+
+ private Trigger2(List<Trigger> subTriggers) {
+ super(subTriggers);
+ }
+
+ @Override
+ public void onElement(Trigger.OnElementContext c) { }
+
+ @Override
+ public void onMerge(Trigger.OnMergeContext c) { }
+
+ @Override
+ protected Trigger getContinuationTrigger(
+ List<Trigger> continuationTriggers) {
+ return null;
+ }
+
+ @Override
+ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+ return null;
+ }
+
+ @Override
+ public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
+ return false;
+ }
+
+ @Override
+ public void onFire(Trigger.TriggerContext context) throws Exception { }
+ }
+}