You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/13 22:22:36 UTC
[02/17] incubator-beam git commit: Rename runners-core Trigger to
TriggerStateMachine
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/NeverTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/NeverTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/NeverTest.java
deleted file mode 100644
index fb2b4d5..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/NeverTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link Never}.
- */
-@RunWith(JUnit4.class)
-public class NeverTest {
- private SimpleTriggerTester<IntervalWindow> triggerTester;
-
- @Before
- public void setup() throws Exception {
- triggerTester =
- TriggerTester.forTrigger(
- Never.ever(), FixedWindows.of(Duration.standardMinutes(5)));
- }
-
- @Test
- public void falseAfterEndOfWindow() throws Exception {
- triggerTester.injectElements(TimestampedValue.of(1, new Instant(1)));
- IntervalWindow window =
- new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardMinutes(5)));
- assertThat(triggerTester.shouldFire(window), is(false));
- triggerTester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
- assertThat(triggerTester.shouldFire(window), is(false));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/OrFinallyTriggerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/OrFinallyTriggerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/OrFinallyTriggerTest.java
deleted file mode 100644
index 7289d97..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/OrFinallyTriggerTest.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link OrFinallyTrigger}.
- */
-@RunWith(JUnit4.class)
-public class OrFinallyTriggerTest {
-
- private SimpleTriggerTester<IntervalWindow> tester;
-
- /**
- * Tests that for {@code OrFinally(actual, ...)} when {@code actual}
- * fires and finishes, the {@code OrFinally} also fires and finishes.
- */
- @Test
- public void testActualFiresAndFinishes() throws Exception {
- tester = TriggerTester.forTrigger(
- new OrFinallyTrigger(
- AfterPane.elementCountAtLeast(2),
- AfterPane.elementCountAtLeast(100)),
- FixedWindows.of(Duration.millis(100)));
-
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
-
- // Not yet firing
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
- assertFalse(tester.isMarkedFinished(window));
-
- // The actual fires and finishes
- tester.injectElements(2);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertTrue(tester.isMarkedFinished(window));
- }
-
- /**
- * Tests that for {@code OrFinally(actual, ...)} when {@code actual}
- * fires but does not finish, the {@code OrFinally} also fires and also does not
- * finish.
- */
- @Test
- public void testActualFiresOnly() throws Exception {
- tester = TriggerTester.forTrigger(
- new OrFinallyTrigger(
- Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
- AfterPane.elementCountAtLeast(100)),
- FixedWindows.of(Duration.millis(100)));
-
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
-
- // Not yet firing
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
- assertFalse(tester.isMarkedFinished(window));
-
- // The actual fires but does not finish
- tester.injectElements(2);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
-
- // And again
- tester.injectElements(3, 4);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
- }
-
- /**
- * Tests that if the first trigger rewinds to be non-finished in the merged window,
- * then it becomes the currently active trigger again, with real triggers.
- */
- @Test
- public void testShouldFireAfterMerge() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterEach.inOrder(
- AfterPane.elementCountAtLeast(5)
- .orFinally(AfterWatermark.pastEndOfWindow()),
- Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
- Sessions.withGapDuration(Duration.millis(10)));
-
- // Finished the orFinally in the first window
- tester.injectElements(1);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- assertFalse(tester.shouldFire(firstWindow));
- tester.advanceInputWatermark(new Instant(11));
- assertTrue(tester.shouldFire(firstWindow));
- tester.fireIfShouldFire(firstWindow);
-
- // Set up second window where it is not done
- tester.injectElements(5);
- IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Merge them, if the merged window were on the second trigger, it would be ready
- tester.mergeWindows();
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
- assertFalse(tester.shouldFire(mergedWindow));
-
- // Now adding 3 more makes the main trigger ready to fire
- tester.injectElements(1, 2, 3, 4, 5);
- tester.mergeWindows();
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
- /**
- * Tests that for {@code OrFinally(actual, until)} when {@code actual}
- * fires but does not finish, then {@code until} fires and finishes, the
- * whole thing fires and finished.
- */
- @Test
- public void testActualFiresButUntilFinishes() throws Exception {
- tester = TriggerTester.forTrigger(
- new OrFinallyTrigger(
- Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
- AfterPane.elementCountAtLeast(3)),
- FixedWindows.of(Duration.millis(10)));
-
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
-
- // Before any firing
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
- assertFalse(tester.isMarkedFinished(window));
-
- // The actual fires but doesn't finish
- tester.injectElements(2);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
-
- // The until fires and finishes; the trigger is finished
- tester.injectElements(3);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertTrue(tester.isMarkedFinished(window));
- }
-
- @Test
- public void testFireDeadline() throws Exception {
- BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10));
-
- assertEquals(new Instant(9),
- Repeatedly.forever(AfterWatermark.pastEndOfWindow())
- .getWatermarkThatGuaranteesFiring(window));
- assertEquals(new Instant(9), Repeatedly.forever(AfterWatermark.pastEndOfWindow())
- .orFinally(AfterPane.elementCountAtLeast(1))
- .getWatermarkThatGuaranteesFiring(window));
- assertEquals(new Instant(9), Repeatedly.forever(AfterPane.elementCountAtLeast(1))
- .orFinally(AfterWatermark.pastEndOfWindow())
- .getWatermarkThatGuaranteesFiring(window));
- assertEquals(new Instant(9),
- AfterPane.elementCountAtLeast(100)
- .orFinally(AfterWatermark.pastEndOfWindow())
- .getWatermarkThatGuaranteesFiring(window));
-
- assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
- Repeatedly.forever(AfterPane.elementCountAtLeast(1))
- .orFinally(AfterPane.elementCountAtLeast(10))
- .getWatermarkThatGuaranteesFiring(window));
- }
-
- @Test
- public void testContinuation() throws Exception {
- OnceTrigger triggerA = AfterProcessingTime.pastFirstElementInPane();
- OnceTrigger triggerB = AfterWatermark.pastEndOfWindow();
- Trigger aOrFinallyB = triggerA.orFinally(triggerB);
- Trigger bOrFinallyA = triggerB.orFinally(triggerA);
- assertEquals(
- Repeatedly.forever(
- triggerA.getContinuationTrigger().orFinally(triggerB.getContinuationTrigger())),
- aOrFinallyB.getContinuationTrigger());
- assertEquals(
- Repeatedly.forever(
- triggerB.getContinuationTrigger().orFinally(triggerA.getContinuationTrigger())),
- bOrFinallyA.getContinuationTrigger());
- }
-
- @Test
- public void testToString() {
- Trigger trigger = StubTrigger.named("t1").orFinally(StubTrigger.named("t2"));
- assertEquals("t1.orFinally(t2)", trigger.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/RepeatedlyTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/RepeatedlyTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/RepeatedlyTest.java
deleted file mode 100644
index 6e8930d..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/RepeatedlyTest.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link Repeatedly}.
- */
-@RunWith(JUnit4.class)
-public class RepeatedlyTest {
-
- @Mock private Trigger mockTrigger;
- private SimpleTriggerTester<IntervalWindow> tester;
- private static Trigger.TriggerContext anyTriggerContext() {
- return Mockito.<Trigger.TriggerContext>any();
- }
-
- public void setUp(WindowFn<Object, IntervalWindow> windowFn) throws Exception {
- MockitoAnnotations.initMocks(this);
- tester = TriggerTester.forTrigger(Repeatedly.forever(mockTrigger), windowFn);
- }
-
- /**
- * Tests that onElement correctly passes the data on to the subtrigger.
- */
- @Test
- public void testOnElement() throws Exception {
- setUp(FixedWindows.of(Duration.millis(10)));
- tester.injectElements(37);
- verify(mockTrigger).onElement(Mockito.<Trigger.OnElementContext>any());
- }
-
- /**
- * Tests that the repeatedly is ready to fire whenever the subtrigger is ready.
- */
- @Test
- public void testShouldFire() throws Exception {
- setUp(FixedWindows.of(Duration.millis(10)));
-
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- assertTrue(tester.shouldFire(new IntervalWindow(new Instant(0), new Instant(10))));
-
- when(mockTrigger.shouldFire(Mockito.<Trigger.TriggerContext>any()))
- .thenReturn(false);
- assertFalse(tester.shouldFire(new IntervalWindow(new Instant(0), new Instant(10))));
- }
-
- /**
- * Tests that the watermark that guarantees firing is that of the subtrigger.
- */
- @Test
- public void testFireDeadline() throws Exception {
- setUp(FixedWindows.of(Duration.millis(10)));
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
- Instant arbitraryInstant = new Instant(34957849);
-
- when(mockTrigger.getWatermarkThatGuaranteesFiring(Mockito.<IntervalWindow>any()))
- .thenReturn(arbitraryInstant);
-
- assertThat(
- Repeatedly.forever(mockTrigger).getWatermarkThatGuaranteesFiring(window),
- equalTo(arbitraryInstant));
- }
-
- @Test
- public void testContinuation() throws Exception {
- Trigger trigger = AfterProcessingTime.pastFirstElementInPane();
- Trigger repeatedly = Repeatedly.forever(trigger);
- assertEquals(
- Repeatedly.forever(trigger.getContinuationTrigger()), repeatedly.getContinuationTrigger());
- assertEquals(
- Repeatedly.forever(trigger.getContinuationTrigger().getContinuationTrigger()),
- repeatedly.getContinuationTrigger().getContinuationTrigger());
- }
-
- @Test
- public void testShouldFireAfterMerge() throws Exception {
- tester = TriggerTester.forTrigger(
- Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
- Sessions.withGapDuration(Duration.millis(10)));
-
- tester.injectElements(1);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- assertFalse(tester.shouldFire(firstWindow));
-
- tester.injectElements(5);
- IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Merge them, if the merged window were on the second trigger, it would be ready
- tester.mergeWindows();
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
- @Test
- public void testRepeatedlyAfterFirstElementCount() throws Exception {
- SimpleTriggerTester<GlobalWindow> tester =
- TriggerTester.forTrigger(
- Repeatedly.forever(
- AfterFirst.of(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(Duration.standardMinutes(15)),
- AfterPane.elementCountAtLeast(5))),
- new GlobalWindows());
-
- GlobalWindow window = GlobalWindow.INSTANCE;
-
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
-
- tester.injectElements(2, 3, 4, 5);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.shouldFire(window));
- }
-
- @Test
- public void testRepeatedlyAfterFirstProcessingTime() throws Exception {
- SimpleTriggerTester<GlobalWindow> tester =
- TriggerTester.forTrigger(
- Repeatedly.forever(
- AfterFirst.of(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(Duration.standardMinutes(15)),
- AfterPane.elementCountAtLeast(5))),
- new GlobalWindows());
-
- GlobalWindow window = GlobalWindow.INSTANCE;
-
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
-
- tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15)));
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.shouldFire(window));
- }
-
- @Test
- public void testRepeatedlyElementCount() throws Exception {
- SimpleTriggerTester<GlobalWindow> tester =
- TriggerTester.forTrigger(
- Repeatedly.forever(AfterPane.elementCountAtLeast(5)),
- new GlobalWindows());
-
- GlobalWindow window = GlobalWindow.INSTANCE;
-
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
-
- tester.injectElements(2, 3, 4, 5);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.shouldFire(window));
- }
-
- @Test
- public void testRepeatedlyProcessingTime() throws Exception {
- SimpleTriggerTester<GlobalWindow> tester =
- TriggerTester.forTrigger(
- Repeatedly.forever(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(Duration.standardMinutes(15))),
- new GlobalWindows());
-
- GlobalWindow window = GlobalWindow.INSTANCE;
-
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
-
- tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15)));
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.shouldFire(window));
- }
-
-
- @Test
- public void testToString() {
- Trigger trigger = Repeatedly.forever(new StubTrigger() {
- @Override
- public String toString() {
- return "innerTrigger";
- }
- });
-
- assertEquals("Repeatedly.forever(innerTrigger)", trigger.toString());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ReshuffleTriggerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ReshuffleTriggerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ReshuffleTriggerTest.java
deleted file mode 100644
index 83077f4..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ReshuffleTriggerTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ReshuffleTrigger}.
- */
-@RunWith(JUnit4.class)
-public class ReshuffleTriggerTest {
-
- /** Public so that other tests can instantiate {@link ReshuffleTrigger}. */
- public static <W extends BoundedWindow> ReshuffleTrigger<W> forTest() {
- return new ReshuffleTrigger<>();
- }
-
- @Test
- public void testShouldFire() throws Exception {
- TriggerTester<Integer, IntervalWindow> tester = TriggerTester.forTrigger(
- new ReshuffleTrigger<IntervalWindow>(), FixedWindows.of(Duration.millis(100)));
- IntervalWindow arbitraryWindow = new IntervalWindow(new Instant(300), new Instant(400));
- assertTrue(tester.shouldFire(arbitraryWindow));
- }
-
- @Test
- public void testOnTimer() throws Exception {
- TriggerTester<Integer, IntervalWindow> tester = TriggerTester.forTrigger(
- new ReshuffleTrigger<IntervalWindow>(), FixedWindows.of(Duration.millis(100)));
- IntervalWindow arbitraryWindow = new IntervalWindow(new Instant(100), new Instant(200));
- tester.fireIfShouldFire(arbitraryWindow);
- assertFalse(tester.isMarkedFinished(arbitraryWindow));
- }
-
- @Test
- public void testToString() {
- Trigger trigger = new ReshuffleTrigger<>();
- assertEquals("ReshuffleTrigger()", trigger.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/StubTrigger.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/StubTrigger.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/StubTrigger.java
deleted file mode 100644
index b258a79..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/StubTrigger.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import com.google.common.collect.Lists;
-import java.util.List;
-import org.joda.time.Instant;
-
-/**
- * No-op {@link OnceTrigger} implementation for testing.
- */
-abstract class StubTrigger extends Trigger.OnceTrigger {
- /**
- * Create a stub {@link Trigger} instance which returns the specified name on {@link #toString()}.
- */
- static StubTrigger named(final String name) {
- return new StubTrigger() {
- @Override
- public String toString() {
- return name;
- }
- };
- }
-
- protected StubTrigger() {
- super(Lists.<Trigger>newArrayList());
- }
-
- @Override
- protected void onOnlyFiring(TriggerContext context) throws Exception {
- }
-
- @Override
- public void onElement(OnElementContext c) throws Exception {
- }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception {
- }
-
- @Override
- public boolean shouldFire(TriggerContext context) throws Exception {
- return false;
- }
-
- @Override
- protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
- return null;
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTest.java
deleted file mode 100644
index cfc03b2..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.List;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link Trigger}.
- */
-@RunWith(JUnit4.class)
-public class TriggerTest {
-
- @Test
- public void testTriggerToString() throws Exception {
- assertEquals("AfterWatermark.pastEndOfWindow()", AfterWatermark.pastEndOfWindow().toString());
- assertEquals("Repeatedly.forever(AfterWatermark.pastEndOfWindow())",
- Repeatedly.forever(AfterWatermark.pastEndOfWindow()).toString());
- }
-
- @Test
- public void testIsCompatible() throws Exception {
- assertTrue(new Trigger1(null).isCompatible(new Trigger1(null)));
- assertTrue(new Trigger1(Arrays.<Trigger>asList(new Trigger2(null)))
- .isCompatible(new Trigger1(Arrays.<Trigger>asList(new Trigger2(null)))));
-
- assertFalse(new Trigger1(null).isCompatible(new Trigger2(null)));
- assertFalse(new Trigger1(Arrays.<Trigger>asList(new Trigger1(null)))
- .isCompatible(new Trigger1(Arrays.<Trigger>asList(new Trigger2(null)))));
- }
-
- private static class Trigger1 extends Trigger {
-
- private Trigger1(List<Trigger> subTriggers) {
- super(subTriggers);
- }
-
- @Override
- public void onElement(Trigger.OnElementContext c) { }
-
- @Override
- public void onMerge(Trigger.OnMergeContext c) { }
-
- @Override
- protected Trigger getContinuationTrigger(
- List<Trigger> continuationTriggers) {
- return null;
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
- return null;
- }
-
- @Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- return false;
- }
-
- @Override
- public void onFire(Trigger.TriggerContext context) throws Exception { }
- }
-
- private static class Trigger2 extends Trigger {
-
- private Trigger2(List<Trigger> subTriggers) {
- super(subTriggers);
- }
-
- @Override
- public void onElement(Trigger.OnElementContext c) { }
-
- @Override
- public void onMerge(Trigger.OnMergeContext c) { }
-
- @Override
- protected Trigger getContinuationTrigger(
- List<Trigger> continuationTriggers) {
- return null;
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
- return null;
- }
-
- @Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- return false;
- }
-
- @Override
- public void onFire(Trigger.TriggerContext context) throws Exception { }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTester.java
deleted file mode 100644
index 5fe17ad..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTester.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.ActiveWindowSet.MergeCallback;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-import org.apache.beam.sdk.util.state.StateNamespaces.WindowAndTriggerNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
-import org.apache.beam.sdk.util.state.TestInMemoryStateInternals;
-import org.apache.beam.sdk.util.state.TimerCallback;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * Test utility that runs a {@link Trigger}, using in-memory stub implementation to provide
- * the {@link StateInternals}.
- *
- * @param <W> The type of windows being used.
- */
-public class TriggerTester<InputT, W extends BoundedWindow> {
-
- /**
- * A {@link TriggerTester} specialized to {@link Integer} values, so elements and timestamps
- * can be conflated. Today, triggers should not observed the element type, so this is the
- * only trigger tester that needs to be used.
- */
- public static class SimpleTriggerTester<W extends BoundedWindow>
- extends TriggerTester<Integer, W> {
-
- private SimpleTriggerTester(WindowingStrategy<Object, W> windowingStrategy) throws Exception {
- super(windowingStrategy);
- }
-
- public void injectElements(int... values) throws Exception {
- List<TimestampedValue<Integer>> timestampedValues =
- Lists.newArrayListWithCapacity(values.length);
- for (int value : values) {
- timestampedValues.add(TimestampedValue.of(value, new Instant(value)));
- }
- injectElements(timestampedValues);
- }
-
- public SimpleTriggerTester<W> withAllowedLateness(Duration allowedLateness) throws Exception {
- return new SimpleTriggerTester<>(
- windowingStrategy.withAllowedLateness(allowedLateness));
- }
- }
-
- protected final WindowingStrategy<Object, W> windowingStrategy;
-
- private final TestInMemoryStateInternals<?> stateInternals =
- new TestInMemoryStateInternals<Object>(null /* key */);
- private final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
- private final TriggerContextFactory<W> contextFactory;
- private final WindowFn<Object, W> windowFn;
- private final ActiveWindowSet<W> activeWindows;
- private final Map<W, W> windowToMergeResult;
-
- /**
- * An {@link ExecutableTrigger} built from the {@link Trigger} or {@link Trigger}
- * under test.
- */
- private final ExecutableTrigger executableTrigger;
-
- /**
- * A map from a window and trigger to whether that trigger is finished for the window.
- */
- private final Map<W, FinishedTriggers> finishedSets;
-
- public static <W extends BoundedWindow> SimpleTriggerTester<W> forTrigger(
- Trigger trigger, WindowFn<Object, W> windowFn)
- throws Exception {
- WindowingStrategy<Object, W> windowingStrategy =
- WindowingStrategy.of(windowFn).withTrigger(trigger)
- // Merging requires accumulation mode or early firings can break up a session.
- // Not currently an issue with the tester (because we never GC) but we don't want
- // mystery failures due to violating this need.
- .withMode(windowFn.isNonMerging()
- ? AccumulationMode.DISCARDING_FIRED_PANES
- : AccumulationMode.ACCUMULATING_FIRED_PANES);
-
- return new SimpleTriggerTester<>(windowingStrategy);
- }
-
- public static <InputT, W extends BoundedWindow> TriggerTester<InputT, W> forAdvancedTrigger(
- Trigger trigger, WindowFn<Object, W> windowFn) throws Exception {
- WindowingStrategy<Object, W> strategy =
- WindowingStrategy.of(windowFn).withTrigger(trigger)
- // Merging requires accumulation mode or early firings can break up a session.
- // Not currently an issue with the tester (because we never GC) but we don't want
- // mystery failures due to violating this need.
- .withMode(windowFn.isNonMerging()
- ? AccumulationMode.DISCARDING_FIRED_PANES
- : AccumulationMode.ACCUMULATING_FIRED_PANES);
-
- return new TriggerTester<>(strategy);
- }
-
- protected TriggerTester(WindowingStrategy<Object, W> windowingStrategy) throws Exception {
- this.windowingStrategy = windowingStrategy;
- this.windowFn = windowingStrategy.getWindowFn();
- this.executableTrigger = windowingStrategy.getTrigger();
- this.finishedSets = new HashMap<>();
-
- this.activeWindows =
- windowFn.isNonMerging()
- ? new NonMergingActiveWindowSet<W>()
- : new MergingActiveWindowSet<W>(windowFn, stateInternals);
- this.windowToMergeResult = new HashMap<>();
-
- this.contextFactory =
- new TriggerContextFactory<>(windowingStrategy.getWindowFn(), stateInternals, activeWindows);
- }
-
- /**
- * Instructs the trigger to clear its state for the given window.
- */
- public void clearState(W window) throws Exception {
- executableTrigger.invokeClear(contextFactory.base(window,
- new TestTimers(windowNamespace(window)), executableTrigger, getFinishedSet(window)));
- }
-
- /**
- * Asserts that the trigger has actually cleared all of its state for the given window. Since
- * the trigger under test is the root, this makes the assert for all triggers regardless
- * of their position in the trigger tree.
- */
- public void assertCleared(W window) {
- for (StateNamespace untypedNamespace : stateInternals.getNamespacesInUse()) {
- if (untypedNamespace instanceof WindowAndTriggerNamespace) {
- @SuppressWarnings("unchecked")
- WindowAndTriggerNamespace<W> namespace = (WindowAndTriggerNamespace<W>) untypedNamespace;
- if (namespace.getWindow().equals(window)) {
- Set<?> tagsInUse = stateInternals.getTagsInUse(namespace);
- assertTrue("Trigger has not cleared tags: " + tagsInUse, tagsInUse.isEmpty());
- }
- }
- }
- }
-
- /**
- * Returns {@code true} if the {@link Trigger} under test is finished for the given window.
- */
- public boolean isMarkedFinished(W window) {
- FinishedTriggers finishedSet = finishedSets.get(window);
- if (finishedSet == null) {
- return false;
- }
-
- return finishedSet.isFinished(executableTrigger);
- }
-
- private StateNamespace windowNamespace(W window) {
- return StateNamespaces.window(windowFn.windowCoder(), checkNotNull(window));
- }
-
- /**
- * Advance the input watermark to the specified time, then advance the output watermark as far as
- * possible.
- */
- public void advanceInputWatermark(Instant newInputWatermark) throws Exception {
- // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694
- timerInternals.advanceInputWatermark(TimerCallback.NO_OP, newInputWatermark);
- }
-
- /** Advance the processing time to the specified time. */
- public void advanceProcessingTime(Instant newProcessingTime) throws Exception {
- // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694
- timerInternals.advanceProcessingTime(TimerCallback.NO_OP, newProcessingTime);
- }
-
- /**
- * Inject all the timestamped values (after passing through the window function) as if they
- * arrived in a single chunk of a bundle (or work-unit).
- */
- @SafeVarargs
- public final void injectElements(TimestampedValue<InputT>... values) throws Exception {
- injectElements(Arrays.asList(values));
- }
-
- public final void injectElements(Collection<TimestampedValue<InputT>> values) throws Exception {
- for (TimestampedValue<InputT> value : values) {
- WindowTracing.trace("TriggerTester.injectElements: {}", value);
- }
-
- List<WindowedValue<InputT>> windowedValues = Lists.newArrayListWithCapacity(values.size());
-
- for (TimestampedValue<InputT> input : values) {
- try {
- InputT value = input.getValue();
- Instant timestamp = input.getTimestamp();
- Collection<W> assignedWindows = windowFn.assignWindows(new TestAssignContext<W>(
- windowFn, value, timestamp, GlobalWindow.INSTANCE));
-
- for (W window : assignedWindows) {
- activeWindows.addActiveForTesting(window);
-
- // Today, triggers assume onTimer firing at the watermark time, whether or not they
- // explicitly set the timer themselves. So this tester must set it.
- timerInternals.setTimer(
- TimerData.of(windowNamespace(window), window.maxTimestamp(), TimeDomain.EVENT_TIME));
- }
-
- windowedValues.add(WindowedValue.of(value, timestamp, assignedWindows, PaneInfo.NO_FIRING));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- for (WindowedValue<InputT> windowedValue : windowedValues) {
- for (BoundedWindow untypedWindow : windowedValue.getWindows()) {
- // SDK is responsible for type safety
- @SuppressWarnings("unchecked")
- W window = mergeResult((W) untypedWindow);
-
- Trigger.OnElementContext context = contextFactory.createOnElementContext(window,
- new TestTimers(windowNamespace(window)), windowedValue.getTimestamp(),
- executableTrigger, getFinishedSet(window));
-
- if (!context.trigger().isFinished()) {
- executableTrigger.invokeOnElement(context);
- }
- }
- }
- }
-
- public boolean shouldFire(W window) throws Exception {
- Trigger.TriggerContext context = contextFactory.base(
- window,
- new TestTimers(windowNamespace(window)),
- executableTrigger, getFinishedSet(window));
- executableTrigger.getSpec().prefetchShouldFire(context.state());
- return executableTrigger.invokeShouldFire(context);
- }
-
- public void fireIfShouldFire(W window) throws Exception {
- Trigger.TriggerContext context = contextFactory.base(
- window,
- new TestTimers(windowNamespace(window)),
- executableTrigger, getFinishedSet(window));
-
- executableTrigger.getSpec().prefetchShouldFire(context.state());
- if (executableTrigger.invokeShouldFire(context)) {
- executableTrigger.getSpec().prefetchOnFire(context.state());
- executableTrigger.invokeOnFire(context);
- if (context.trigger().isFinished()) {
- activeWindows.remove(window);
- executableTrigger.invokeClear(context);
- }
- }
- }
-
- public void setSubTriggerFinishedForWindow(int subTriggerIndex, W window, boolean value) {
- getFinishedSet(window).setFinished(executableTrigger.subTriggers().get(subTriggerIndex), value);
- }
-
- /**
- * Invokes merge from the {@link WindowFn} a single time and passes the resulting merge
- * events on to the trigger under test. Does not persist the fact that merging happened,
- * since it is just to test the trigger's {@code OnMerge} method.
- */
- public final void mergeWindows() throws Exception {
- windowToMergeResult.clear();
- activeWindows.merge(new MergeCallback<W>() {
- @Override
- public void prefetchOnMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {}
-
- @Override
- public void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {
- List<W> activeToBeMerged = new ArrayList<W>();
- for (W window : toBeMerged) {
- windowToMergeResult.put(window, mergeResult);
- if (activeWindows.isActive(window)) {
- activeToBeMerged.add(window);
- }
- }
- Map<W, FinishedTriggers> mergingFinishedSets =
- Maps.newHashMapWithExpectedSize(activeToBeMerged.size());
- for (W oldWindow : activeToBeMerged) {
- mergingFinishedSets.put(oldWindow, getFinishedSet(oldWindow));
- }
- executableTrigger.invokeOnMerge(contextFactory.createOnMergeContext(mergeResult,
- new TestTimers(windowNamespace(mergeResult)), executableTrigger,
- getFinishedSet(mergeResult), mergingFinishedSets));
- timerInternals.setTimer(TimerData.of(
- windowNamespace(mergeResult), mergeResult.maxTimestamp(), TimeDomain.EVENT_TIME));
- }
- });
- }
-
- public W mergeResult(W window) {
- W result = windowToMergeResult.get(window);
- return result == null ? window : result;
- }
-
- private FinishedTriggers getFinishedSet(W window) {
- FinishedTriggers finishedSet = finishedSets.get(window);
- if (finishedSet == null) {
- finishedSet = FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>());
- finishedSets.put(window, finishedSet);
- }
- return finishedSet;
- }
-
- private static class TestAssignContext<W extends BoundedWindow>
- extends WindowFn<Object, W>.AssignContext {
- private Object element;
- private Instant timestamp;
- private BoundedWindow window;
-
- public TestAssignContext(
- WindowFn<Object, W> windowFn, Object element, Instant timestamp, BoundedWindow window) {
- windowFn.super();
- this.element = element;
- this.timestamp = timestamp;
- this.window = window;
- }
-
- @Override
- public Object element() {
- return element;
- }
-
- @Override
- public Instant timestamp() {
- return timestamp;
- }
-
- @Override
- public BoundedWindow window() {
- return window;
- }
- }
-
- private class TestTimers implements Timers {
- private final StateNamespace namespace;
-
- public TestTimers(StateNamespace namespace) {
- checkArgument(namespace instanceof WindowNamespace);
- this.namespace = namespace;
- }
-
- @Override
- public void setTimer(Instant timestamp, TimeDomain timeDomain) {
- timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain));
- }
-
- @Override
- public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
- timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain));
- }
-
- @Override
- public Instant currentProcessingTime() {
- return timerInternals.currentProcessingTime();
- }
-
- @Override
- @Nullable
- public Instant currentSynchronizedProcessingTime() {
- return timerInternals.currentSynchronizedProcessingTime();
- }
-
- @Override
- public Instant currentEventTime() {
- return timerInternals.currentInputWatermarkTime();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterAllStateMachineTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterAllStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterAllStateMachineTest.java
new file mode 100644
index 0000000..907292c
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterAllStateMachineTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link AfterAllStateMachine}.
+ */
+@RunWith(JUnit4.class)
+public class AfterAllStateMachineTest {
+
+ private SimpleTriggerStateMachineTester<IntervalWindow> tester;
+
+ @Test
+ public void testT1FiresFirst() throws Exception {
+ tester = TriggerStateMachineTester.forTrigger(
+ AfterAllStateMachine.of(
+ AfterPaneStateMachine.elementCountAtLeast(1),
+ AfterPaneStateMachine.elementCountAtLeast(2)),
+ FixedWindows.of(Duration.millis(100)));
+
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
+
+ tester.injectElements(1);
+ assertFalse(tester.shouldFire(window));
+
+ tester.injectElements(2);
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertTrue(tester.isMarkedFinished(window));
+ }
+
+ @Test
+ public void testT2FiresFirst() throws Exception {
+ tester = TriggerStateMachineTester.forTrigger(
+ AfterAllStateMachine.of(
+ AfterPaneStateMachine.elementCountAtLeast(2),
+ AfterPaneStateMachine.elementCountAtLeast(1)),
+ FixedWindows.of(Duration.millis(100)));
+
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
+
+ tester.injectElements(1);
+ assertFalse(tester.shouldFire(window));
+
+ tester.injectElements(2);
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertTrue(tester.isMarkedFinished(window));
+ }
+
+ /**
+ * Tests that the AfterAll properly unsets finished bits when a merge causing it to become
+ * unfinished.
+ */
+ @Test
+ public void testOnMergeRewinds() throws Exception {
+ tester = TriggerStateMachineTester.forTrigger(
+ AfterEachStateMachine.inOrder(
+ AfterAllStateMachine.of(
+ AfterWatermarkStateMachine.pastEndOfWindow(),
+ AfterPaneStateMachine.elementCountAtLeast(1)),
+ RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(1))),
+ Sessions.withGapDuration(Duration.millis(10)));
+
+ tester.injectElements(1);
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
+
+ tester.injectElements(5);
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
+
+ // Finish the AfterAll in the first window
+ tester.advanceInputWatermark(new Instant(11));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ tester.fireIfShouldFire(firstWindow);
+
+ // Merge them; the AfterAll should not be finished
+ tester.mergeWindows();
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
+ assertFalse(tester.isMarkedFinished(mergedWindow));
+
+ // Confirm that we are back on the first trigger by probing that it is not ready to fire
+ // after an element (with merging)
+ tester.injectElements(3);
+ tester.mergeWindows();
+ assertFalse(tester.shouldFire(mergedWindow));
+
+ // Fire the AfterAll in the merged window
+ tester.advanceInputWatermark(new Instant(15));
+ assertTrue(tester.shouldFire(mergedWindow));
+ tester.fireIfShouldFire(mergedWindow);
+
+ // Confirm that we are on the second trigger by probing
+ tester.injectElements(2);
+ tester.mergeWindows();
+ assertTrue(tester.shouldFire(mergedWindow));
+ tester.fireIfShouldFire(mergedWindow);
+ tester.injectElements(2);
+ tester.mergeWindows();
+ assertTrue(tester.shouldFire(mergedWindow));
+ tester.fireIfShouldFire(mergedWindow);
+ }
+
+ @Test
+ public void testToString() {
+ TriggerStateMachine trigger =
+ AfterAllStateMachine.of(
+ StubTriggerStateMachine.named("t1"), StubTriggerStateMachine.named("t2"));
+ assertEquals("AfterAll.of(t1, t2)", trigger.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterEachStateMachineTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterEachStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterEachStateMachineTest.java
new file mode 100644
index 0000000..4fae8f1
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterEachStateMachineTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link AfterEachStateMachine}.
+ */
+@RunWith(JUnit4.class)
+public class AfterEachStateMachineTest {
+
+ private SimpleTriggerStateMachineTester<IntervalWindow> tester;
+
+ @Before
+ public void initMocks() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ /**
+ * Tests that the {@link AfterEachStateMachine} trigger fires and finishes the first trigger then
+ * the second.
+ */
+ @Test
+ public void testAfterEachInSequence() throws Exception {
+ tester = TriggerStateMachineTester.forTrigger(
+ AfterEachStateMachine.inOrder(
+ RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(2))
+ .orFinally(AfterPaneStateMachine.elementCountAtLeast(3)),
+ RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(5))
+ .orFinally(AfterWatermarkStateMachine.pastEndOfWindow())),
+ FixedWindows.of(Duration.millis(10)));
+
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
+
+ // AfterCount(2) not ready
+ tester.injectElements(1);
+ assertFalse(tester.shouldFire(window));
+
+ // AfterCount(2) ready, not finished
+ tester.injectElements(2);
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.isMarkedFinished(window));
+
+ // orFinally(AfterCount(3)) ready and will finish the first
+ tester.injectElements(1, 2, 3);
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.isMarkedFinished(window));
+
+ // Now running as the second trigger
+ assertFalse(tester.shouldFire(window));
+ // This quantity of elements would fire and finish if it were erroneously still the first
+ tester.injectElements(1, 2, 3, 4);
+ assertFalse(tester.shouldFire(window));
+
+ // Now fire once
+ tester.injectElements(5);
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertFalse(tester.isMarkedFinished(window));
+
+ // This time advance the watermark to finish the whole mess.
+ tester.advanceInputWatermark(new Instant(10));
+ assertTrue(tester.shouldFire(window));
+ tester.fireIfShouldFire(window);
+ assertTrue(tester.isMarkedFinished(window));
+ }
+
+ @Test
+ public void testToString() {
+ TriggerStateMachine trigger = AfterEachStateMachine.inOrder(
+ StubTriggerStateMachine.named("t1"),
+ StubTriggerStateMachine.named("t2"),
+ StubTriggerStateMachine.named("t3"));
+
+ assertEquals("AfterEach.inOrder(t1, t2, t3)", trigger.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java
new file mode 100644
index 0000000..453c8ff
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link AfterFirstStateMachine}.
+ */
+@RunWith(JUnit4.class)
+public class AfterFirstStateMachineTest {
+
+ @Mock private OnceTriggerStateMachine mockTrigger1;
+ @Mock private OnceTriggerStateMachine mockTrigger2;
+ private SimpleTriggerStateMachineTester<IntervalWindow> tester;
+ private static TriggerStateMachine.TriggerContext anyTriggerContext() {
+ return Mockito.<TriggerStateMachine.TriggerContext>any();
+ }
+
+ @Before
+ public void initMocks() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testNeitherShouldFireFixedWindows() throws Exception {
+ tester =
+ TriggerStateMachineTester.forTrigger(
+ AfterFirstStateMachine.of(mockTrigger1, mockTrigger2),
+ FixedWindows.of(Duration.millis(10)));
+
+ tester.injectElements(1);
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
+
+ when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(false);
+ when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false);
+
+ assertFalse(tester.shouldFire(window)); // should not fire
+ assertFalse(tester.isMarkedFinished(window)); // not finished
+ }
+
+ @Test
+ public void testOnlyT1ShouldFireFixedWindows() throws Exception {
+ tester =
+ TriggerStateMachineTester.forTrigger(
+ AfterFirstStateMachine.of(mockTrigger1, mockTrigger2),
+ FixedWindows.of(Duration.millis(10)));
+ tester.injectElements(1);
+ IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11));
+
+ when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true);
+ when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false);
+
+ assertTrue(tester.shouldFire(window)); // should fire
+
+ tester.fireIfShouldFire(window);
+ assertTrue(tester.isMarkedFinished(window));
+ }
+
+ @Test
+ public void testOnlyT2ShouldFireFixedWindows() throws Exception {
+ tester = TriggerStateMachineTester.forTrigger(
+ AfterFirstStateMachine.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10)));
+ tester.injectElements(1);
+ IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11));
+
+ when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(false);
+ when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(true);
+ assertTrue(tester.shouldFire(window)); // should fire
+
+ tester.fireIfShouldFire(window); // now finished
+ assertTrue(tester.isMarkedFinished(window));
+ }
+
+ @Test
+ public void testBothShouldFireFixedWindows() throws Exception {
+ tester = TriggerStateMachineTester.forTrigger(
+ AfterFirstStateMachine.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10)));
+ tester.injectElements(1);
+ IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11));
+
+ when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true);
+ when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(true);
+ assertTrue(tester.shouldFire(window)); // should fire
+
+ tester.fireIfShouldFire(window);
+ assertTrue(tester.isMarkedFinished(window));
+ }
+
+ /**
+ * Tests that if the first trigger rewinds to be non-finished in the merged window,
+ * then it becomes the currently active trigger again, with real triggers.
+ */
+ @Test
+ public void testShouldFireAfterMerge() throws Exception {
+ tester = TriggerStateMachineTester.forTrigger(
+ AfterEachStateMachine.inOrder(
+ AfterFirstStateMachine.of(
+ AfterPaneStateMachine.elementCountAtLeast(5),
+ AfterWatermarkStateMachine.pastEndOfWindow()),
+ RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(1))),
+ Sessions.withGapDuration(Duration.millis(10)));
+
+ // Finished the AfterFirst in the first window
+ tester.injectElements(1);
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
+ assertFalse(tester.shouldFire(firstWindow));
+ tester.advanceInputWatermark(new Instant(11));
+ assertTrue(tester.shouldFire(firstWindow));
+ tester.fireIfShouldFire(firstWindow);
+
+ // Set up second window where it is not done
+ tester.injectElements(5);
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ // Merge them, if the merged window were on the second trigger, it would be ready
+ tester.mergeWindows();
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
+ assertFalse(tester.shouldFire(mergedWindow));
+
+ // Now adding 3 more makes the AfterFirst ready to fire
+ tester.injectElements(1, 2, 3, 4, 5);
+ tester.mergeWindows();
+ assertTrue(tester.shouldFire(mergedWindow));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachineTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachineTest.java
new file mode 100644
index 0000000..4240174
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachineTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link AfterPaneStateMachine}.
+ */
+@RunWith(JUnit4.class)
+public class AfterPaneStateMachineTest {
+
+ SimpleTriggerStateMachineTester<IntervalWindow> tester;
+ /**
+ * Tests that the trigger does fire when enough elements are in a window, and that it only
+ * fires that window (no leakage).
+ */
+ @Test
+ public void testAfterPaneElementCountFixedWindows() throws Exception {
+ tester = TriggerStateMachineTester.forTrigger(
+ AfterPaneStateMachine.elementCountAtLeast(2),
+ FixedWindows.of(Duration.millis(10)));
+
+ tester.injectElements(1); // [0, 10)
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
+ assertFalse(tester.shouldFire(window));
+
+ tester.injectElements(2); // [0, 10)
+ tester.injectElements(11); // [10, 20)
+
+ assertTrue(tester.shouldFire(window)); // ready to fire
+ tester.fireIfShouldFire(window); // and finished
+ assertTrue(tester.isMarkedFinished(window));
+
+ // But don't finish the other window
+ assertFalse(tester.isMarkedFinished(new IntervalWindow(new Instant(10), new Instant(20))));
+ }
+
+ @Test
+ public void testClear() throws Exception {
+ SimpleTriggerStateMachineTester<IntervalWindow> tester = TriggerStateMachineTester.forTrigger(
+ AfterPaneStateMachine.elementCountAtLeast(2),
+ FixedWindows.of(Duration.millis(10)));
+
+ tester.injectElements(1, 2, 3);
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
+ tester.clearState(window);
+ tester.assertCleared(window);
+ }
+
+ @Test
+ public void testAfterPaneElementCountSessions() throws Exception {
+ tester = TriggerStateMachineTester.forTrigger(
+ AfterPaneStateMachine.elementCountAtLeast(2),
+ Sessions.withGapDuration(Duration.millis(10)));
+
+ tester.injectElements(
+ 1, // in [1, 11)
+ 2); // in [2, 12)
+
+ assertFalse(tester.shouldFire(new IntervalWindow(new Instant(1), new Instant(11))));
+ assertFalse(tester.shouldFire(new IntervalWindow(new Instant(2), new Instant(12))));
+
+ tester.mergeWindows();
+
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(12));
+ assertTrue(tester.shouldFire(mergedWindow));
+ tester.fireIfShouldFire(mergedWindow);
+ assertTrue(tester.isMarkedFinished(mergedWindow));
+
+ // Because we closed the previous window, we don't have it around to merge with. So there
+ // will be a new FIRE_AND_FINISH result.
+ tester.injectElements(
+ 7, // in [7, 17)
+ 9); // in [9, 19)
+
+ tester.mergeWindows();
+
+ IntervalWindow newMergedWindow = new IntervalWindow(new Instant(7), new Instant(19));
+ assertTrue(tester.shouldFire(newMergedWindow));
+ tester.fireIfShouldFire(newMergedWindow);
+ assertTrue(tester.isMarkedFinished(newMergedWindow));
+ }
+
+ @Test
+ public void testToString() {
+ TriggerStateMachine trigger = AfterPaneStateMachine.elementCountAtLeast(5);
+ assertEquals("AfterPane.elementCountAtLeast(5)", trigger.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachineTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachineTest.java
new file mode 100644
index 0000000..9fbf801
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachineTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests the {@link AfterProcessingTimeStateMachine}.
+ */
+@RunWith(JUnit4.class)
+public class AfterProcessingTimeStateMachineTest {
+
+ /**
+ * Tests the basic property that the trigger does wait for processing time to be
+ * far enough advanced.
+ */
+ @Test
+ public void testAfterProcessingTimeFixedWindows() throws Exception {
+ Duration windowDuration = Duration.millis(10);
+ SimpleTriggerStateMachineTester<IntervalWindow> tester = TriggerStateMachineTester.forTrigger(
+ AfterProcessingTimeStateMachine
+ .pastFirstElementInPane()
+ .plusDelayOf(Duration.millis(5)),
+ FixedWindows.of(windowDuration));
+
+ tester.advanceProcessingTime(new Instant(10));
+
+ // Timer at 15
+ tester.injectElements(1);
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(10));
+ tester.advanceProcessingTime(new Instant(12));
+ assertFalse(tester.shouldFire(firstWindow));
+
+ // Load up elements in the next window, timer at 17 for them
+ tester.injectElements(11, 12, 13);
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(20));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ // Not quite time to fire
+ tester.advanceProcessingTime(new Instant(14));
+ assertFalse(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ // Timer at 19 for these in the first window; it should be ignored since the 15 will fire first
+ tester.injectElements(2, 3);
+
+ // Advance past the first timer and fire, finishing the first window
+ tester.advanceProcessingTime(new Instant(16));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ tester.fireIfShouldFire(firstWindow);
+ assertTrue(tester.isMarkedFinished(firstWindow));
+
+ // The next window fires and finishes now
+ tester.advanceProcessingTime(new Instant(18));
+ assertTrue(tester.shouldFire(secondWindow));
+ tester.fireIfShouldFire(secondWindow);
+ assertTrue(tester.isMarkedFinished(secondWindow));
+ }
+
+ /**
+ * Tests that when windows merge, if the trigger is waiting for "N millis after the first
+ * element" that it is relative to the earlier of the two merged windows.
+ */
+ @Test
+ public void testClear() throws Exception {
+ SimpleTriggerStateMachineTester<IntervalWindow> tester = TriggerStateMachineTester.forTrigger(
+ AfterProcessingTimeStateMachine
+ .pastFirstElementInPane()
+ .plusDelayOf(Duration.millis(5)),
+ FixedWindows.of(Duration.millis(10)));
+
+ tester.injectElements(1, 2, 3);
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
+ tester.clearState(window);
+ tester.assertCleared(window);
+ }
+
+ @Test
+ public void testAfterProcessingTimeWithMergingWindow() throws Exception {
+ SimpleTriggerStateMachineTester<IntervalWindow> tester = TriggerStateMachineTester.forTrigger(
+ AfterProcessingTimeStateMachine
+ .pastFirstElementInPane()
+ .plusDelayOf(Duration.millis(5)),
+ Sessions.withGapDuration(Duration.millis(10)));
+
+ tester.advanceProcessingTime(new Instant(10));
+ tester.injectElements(1); // in [1, 11), timer for 15
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
+ assertFalse(tester.shouldFire(firstWindow));
+
+ tester.advanceProcessingTime(new Instant(12));
+ tester.injectElements(3); // in [3, 13), timer for 17
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new Instant(13));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ tester.mergeWindows();
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(13));
+
+ tester.advanceProcessingTime(new Instant(16));
+ assertTrue(tester.shouldFire(mergedWindow));
+ }
+
+ /**
+ * Basic test of compatibility check between identical triggers.
+ */
+ @Test
+ public void testCompatibilityIdentical() throws Exception {
+ TriggerStateMachine t1 = AfterProcessingTimeStateMachine.pastFirstElementInPane()
+ .plusDelayOf(Duration.standardMinutes(1L));
+ TriggerStateMachine t2 = AfterProcessingTimeStateMachine.pastFirstElementInPane()
+ .plusDelayOf(Duration.standardMinutes(1L));
+ assertTrue(t1.isCompatible(t2));
+ }
+
+ @Test
+ public void testToString() {
+ TriggerStateMachine trigger = AfterProcessingTimeStateMachine.pastFirstElementInPane();
+ assertEquals("AfterProcessingTime.pastFirstElementInPane()", trigger.toString());
+ }
+
+ @Test
+ public void testWithDelayToString() {
+ TriggerStateMachine trigger = AfterProcessingTimeStateMachine.pastFirstElementInPane()
+ .plusDelayOf(Duration.standardMinutes(5));
+
+ assertEquals("AfterProcessingTime.pastFirstElementInPane().plusDelayOf(5 minutes)",
+ trigger.toString());
+ }
+
+ @Test
+ public void testBuiltUpToString() {
+ TriggerStateMachine trigger = AfterWatermarkStateMachine.pastEndOfWindow()
+ .withLateFirings(AfterProcessingTimeStateMachine
+ .pastFirstElementInPane()
+ .plusDelayOf(Duration.standardMinutes(10)));
+
+ String expected = "AfterWatermark.pastEndOfWindow()"
+ + ".withLateFirings(AfterProcessingTime"
+ + ".pastFirstElementInPane()"
+ + ".plusDelayOf(10 minutes))";
+
+ assertEquals(expected, trigger.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java
new file mode 100644
index 0000000..140bd62
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests the {@link AfterSynchronizedProcessingTimeStateMachine}.
+ */
+@RunWith(JUnit4.class)
+public class AfterSynchronizedProcessingTimeStateMachineTest {
+
+ private TriggerStateMachine underTest = new AfterSynchronizedProcessingTimeStateMachine();
+
+ @Test
+ public void testAfterProcessingTimeWithFixedWindows() throws Exception {
+ Duration windowDuration = Duration.millis(10);
+ SimpleTriggerStateMachineTester<IntervalWindow> tester = TriggerStateMachineTester.forTrigger(
+ AfterProcessingTimeStateMachine
+ .pastFirstElementInPane()
+ .plusDelayOf(Duration.millis(5)),
+ FixedWindows.of(windowDuration));
+
+ tester.advanceProcessingTime(new Instant(10));
+
+ // Timer at 15
+ tester.injectElements(1);
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(10));
+ tester.advanceProcessingTime(new Instant(12));
+ assertFalse(tester.shouldFire(firstWindow));
+
+ // Load up elements in the next window, timer at 17 for them
+ tester.injectElements(11, 12, 13);
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(20));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ // Not quite time to fire
+ tester.advanceProcessingTime(new Instant(14));
+ assertFalse(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ // Timer at 19 for these in the first window; it should be ignored since the 15 will fire first
+ tester.injectElements(2, 3);
+
+ // Advance past the first timer and fire, finishing the first window
+ tester.advanceProcessingTime(new Instant(16));
+ assertTrue(tester.shouldFire(firstWindow));
+ assertFalse(tester.shouldFire(secondWindow));
+ tester.fireIfShouldFire(firstWindow);
+ assertTrue(tester.isMarkedFinished(firstWindow));
+
+ // The next window fires and finishes now
+ tester.advanceProcessingTime(new Instant(18));
+ assertTrue(tester.shouldFire(secondWindow));
+ tester.fireIfShouldFire(secondWindow);
+ assertTrue(tester.isMarkedFinished(secondWindow));
+ }
+
+ @Test
+ public void testAfterProcessingTimeWithMergingWindow() throws Exception {
+ Duration windowDuration = Duration.millis(10);
+ SimpleTriggerStateMachineTester<IntervalWindow> tester = TriggerStateMachineTester.forTrigger(
+ AfterProcessingTimeStateMachine
+ .pastFirstElementInPane()
+ .plusDelayOf(Duration.millis(5)),
+ Sessions.withGapDuration(windowDuration));
+
+ tester.advanceProcessingTime(new Instant(10));
+ tester.injectElements(1); // in [1, 11), timer for 15
+ IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
+ assertFalse(tester.shouldFire(firstWindow));
+
+ tester.advanceProcessingTime(new Instant(12));
+ tester.injectElements(3); // in [3, 13), timer for 17
+ IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new Instant(13));
+ assertFalse(tester.shouldFire(secondWindow));
+
+ tester.mergeWindows();
+ IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(13));
+
+ tester.advanceProcessingTime(new Instant(16));
+ assertTrue(tester.shouldFire(mergedWindow));
+ }
+}