You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/13 22:22:37 UTC
[03/17] incubator-beam git commit: Rename runners-core Trigger to
TriggerStateMachine
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
new file mode 100644
index 0000000..0ffbbca
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.BitSetCoder;
+import org.apache.beam.sdk.util.Timers;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.joda.time.Instant;
+
+/**
+ * Executes a trigger while managing persistence of information about which subtriggers are
+ * finished. Subtriggers include all recursive trigger expressions as well as the entire trigger.
+ *
+ * <p>Specifically, the responsibilities are:
+ *
+ * <ul>
+ * <li>Invoking the trigger's methods via its {@link ExecutableTriggerStateMachine} wrapper by
+ * constructing the appropriate trigger contexts.</li>
+ * <li>Committing a record of which subtriggers are finished to persistent state.</li>
+ * <li>Restoring the record of which subtriggers are finished from persistent state.</li>
+ * <li>Clearing out the persisted finished set when a caller indicates
+ * (via {#link #clearFinished}) that it is no longer needed.</li>
+ * </ul>
+ *
+ * <p>These responsibilities are intertwined: trigger contexts include mutable information about
+ * which subtriggers are finished. This class provides the information when building the contexts
+ * and commits the information when the method of the {@link ExecutableTriggerStateMachine} returns.
+ *
+ * @param <W> The kind of windows being processed.
+ */
+public class TriggerStateMachineRunner<W extends BoundedWindow> {
+ @VisibleForTesting
+ static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG =
+ StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of()));
+
+ private final ExecutableTriggerStateMachine rootTrigger;
+ private final TriggerStateMachineContextFactory<W> contextFactory;
+
+ public TriggerStateMachineRunner(
+ ExecutableTriggerStateMachine rootTrigger,
+ TriggerStateMachineContextFactory<W> contextFactory) {
+ checkState(rootTrigger.getTriggerIndex() == 0);
+ this.rootTrigger = rootTrigger;
+ this.contextFactory = contextFactory;
+ }
+
+ private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) {
+ if (!isFinishedSetNeeded()) {
+ // If no trigger in the tree will ever have finished bits, then we don't need to read them.
+ // So that the code can be agnostic to that fact, we create a BitSet that is all 0 (not
+ // finished) for each trigger in the tree.
+ return FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree());
+ }
+
+ BitSet bitSet = state.read();
+ return bitSet == null
+ ? FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree())
+ : FinishedTriggersBitSet.fromBitSet(bitSet);
+ }
+
+
+ private void clearFinishedBits(ValueState<BitSet> state) {
+ if (!isFinishedSetNeeded()) {
+ // Nothing to clear.
+ return;
+ }
+ state.clear();
+ }
+
+ /** Return true if the trigger is closed in the window corresponding to the specified state. */
+ public boolean isClosed(StateAccessor<?> state) {
+ return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
+ }
+
+ public void prefetchForValue(W window, StateAccessor<?> state) {
+ if (isFinishedSetNeeded()) {
+ state.access(FINISHED_BITS_TAG).readLater();
+ }
+ rootTrigger.getSpec().prefetchOnElement(
+ contextFactory.createStateAccessor(window, rootTrigger));
+ }
+
+ public void prefetchOnFire(W window, StateAccessor<?> state) {
+ if (isFinishedSetNeeded()) {
+ state.access(FINISHED_BITS_TAG).readLater();
+ }
+ rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger));
+ }
+
+ public void prefetchShouldFire(W window, StateAccessor<?> state) {
+ if (isFinishedSetNeeded()) {
+ state.access(FINISHED_BITS_TAG).readLater();
+ }
+ rootTrigger.getSpec().prefetchShouldFire(
+ contextFactory.createStateAccessor(window, rootTrigger));
+ }
+
+ /**
+ * Run the trigger logic to deal with a new value.
+ */
+ public void processValue(W window, Instant timestamp, Timers timers, StateAccessor<?> state)
+ throws Exception {
+ // Clone so that we can detect changes and so that changes here don't pollute merging.
+ FinishedTriggersBitSet finishedSet =
+ readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
+ TriggerStateMachine.OnElementContext triggerContext = contextFactory.createOnElementContext(
+ window, timers, timestamp, rootTrigger, finishedSet);
+ rootTrigger.invokeOnElement(triggerContext);
+ persistFinishedSet(state, finishedSet);
+ }
+
+ public void prefetchForMerge(
+ W window, Collection<W> mergingWindows, MergingStateAccessor<?, W> state) {
+ if (isFinishedSetNeeded()) {
+ for (ValueState<?> value : state.accessInEachMergingWindow(FINISHED_BITS_TAG).values()) {
+ value.readLater();
+ }
+ }
+ rootTrigger.getSpec().prefetchOnMerge(contextFactory.createMergingStateAccessor(
+ window, mergingWindows, rootTrigger));
+ }
+
+ /**
+ * Run the trigger merging logic as part of executing the specified merge.
+ */
+ public void onMerge(W window, Timers timers, MergingStateAccessor<?, W> state) throws Exception {
+ // Clone so that we can detect changes and so that changes here don't pollute merging.
+ FinishedTriggersBitSet finishedSet =
+ readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
+
+ // And read the finished bits in each merging window.
+ ImmutableMap.Builder<W, FinishedTriggers> builder = ImmutableMap.builder();
+ for (Map.Entry<W, ValueState<BitSet>> entry :
+ state.accessInEachMergingWindow(FINISHED_BITS_TAG).entrySet()) {
+ // Don't need to clone these, since the trigger context doesn't allow modification
+ builder.put(entry.getKey(), readFinishedBits(entry.getValue()));
+ // Clear the underlying finished bits.
+ clearFinishedBits(entry.getValue());
+ }
+ ImmutableMap<W, FinishedTriggers> mergingFinishedSets = builder.build();
+
+ TriggerStateMachine.OnMergeContext mergeContext = contextFactory.createOnMergeContext(
+ window, timers, rootTrigger, finishedSet, mergingFinishedSets);
+
+ // Run the merge from the trigger
+ rootTrigger.invokeOnMerge(mergeContext);
+
+ persistFinishedSet(state, finishedSet);
+ }
+
+ public boolean shouldFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
+ FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
+ TriggerStateMachine.TriggerContext context = contextFactory.base(window, timers,
+ rootTrigger, finishedSet);
+ return rootTrigger.invokeShouldFire(context);
+ }
+
+ public void onFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
+ // shouldFire should be false.
+ // However it is too expensive to assert.
+ FinishedTriggersBitSet finishedSet =
+ readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
+ TriggerStateMachine.TriggerContext context = contextFactory.base(window, timers,
+ rootTrigger, finishedSet);
+ rootTrigger.invokeOnFire(context);
+ persistFinishedSet(state, finishedSet);
+ }
+
+ private void persistFinishedSet(
+ StateAccessor<?> state, FinishedTriggersBitSet modifiedFinishedSet) {
+ if (!isFinishedSetNeeded()) {
+ return;
+ }
+
+ ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG);
+ if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
+ if (modifiedFinishedSet.getBitSet().isEmpty()) {
+ finishedSetState.clear();
+ } else {
+ finishedSetState.write(modifiedFinishedSet.getBitSet());
+ }
+ }
+ }
+
+ /**
+ * Clear the finished bits.
+ */
+ public void clearFinished(StateAccessor<?> state) {
+ clearFinishedBits(state.access(FINISHED_BITS_TAG));
+ }
+
+ /**
+ * Clear the state used for executing triggers, but leave the finished set to indicate
+ * the window is closed.
+ */
+ public void clearState(W window, Timers timers, StateAccessor<?> state) throws Exception {
+ // Don't need to clone, because we'll be clearing the finished bits anyways.
+ FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG));
+ rootTrigger.invokeClear(contextFactory.base(window, timers, rootTrigger, finishedSet));
+ }
+
+ private boolean isFinishedSetNeeded() {
+ // TODO: If we know that no trigger in the tree will ever finish, we don't need to do the
+ // lookup. Right now, we special case this for the DefaultTrigger.
+ return !(rootTrigger.getSpec() instanceof DefaultTriggerStateMachine);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/package-info.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/package-info.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/package-info.java
new file mode 100644
index 0000000..b7c7050
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * State machine implementations for triggers, called "triggers" because
+ * they react to events.
+ */
+package org.apache.beam.runners.core.triggers;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterAllTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterAllTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterAllTest.java
deleted file mode 100644
index b591229..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterAllTest.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link AfterAll}.
- */
-@RunWith(JUnit4.class)
-public class AfterAllTest {
-
- private SimpleTriggerTester<IntervalWindow> tester;
-
- @Test
- public void testT1FiresFirst() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterAll.of(
- AfterPane.elementCountAtLeast(1),
- AfterPane.elementCountAtLeast(2)),
- FixedWindows.of(Duration.millis(100)));
-
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
-
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
-
- tester.injectElements(2);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertTrue(tester.isMarkedFinished(window));
- }
-
- @Test
- public void testT2FiresFirst() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterAll.of(
- AfterPane.elementCountAtLeast(2),
- AfterPane.elementCountAtLeast(1)),
- FixedWindows.of(Duration.millis(100)));
-
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
-
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
-
- tester.injectElements(2);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertTrue(tester.isMarkedFinished(window));
- }
-
- /**
- * Tests that the AfterAll properly unsets finished bits when a merge causing it to become
- * unfinished.
- */
- @Test
- public void testOnMergeRewinds() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterEach.inOrder(
- AfterAll.of(
- AfterWatermark.pastEndOfWindow(),
- AfterPane.elementCountAtLeast(1)),
- Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
- Sessions.withGapDuration(Duration.millis(10)));
-
- tester.injectElements(1);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
-
- tester.injectElements(5);
- IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
-
- // Finish the AfterAll in the first window
- tester.advanceInputWatermark(new Instant(11));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
-
- // Merge them; the AfterAll should not be finished
- tester.mergeWindows();
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
- assertFalse(tester.isMarkedFinished(mergedWindow));
-
- // Confirm that we are back on the first trigger by probing that it is not ready to fire
- // after an element (with merging)
- tester.injectElements(3);
- tester.mergeWindows();
- assertFalse(tester.shouldFire(mergedWindow));
-
- // Fire the AfterAll in the merged window
- tester.advanceInputWatermark(new Instant(15));
- assertTrue(tester.shouldFire(mergedWindow));
- tester.fireIfShouldFire(mergedWindow);
-
- // Confirm that we are on the second trigger by probing
- tester.injectElements(2);
- tester.mergeWindows();
- assertTrue(tester.shouldFire(mergedWindow));
- tester.fireIfShouldFire(mergedWindow);
- tester.injectElements(2);
- tester.mergeWindows();
- assertTrue(tester.shouldFire(mergedWindow));
- tester.fireIfShouldFire(mergedWindow);
- }
-
- @Test
- public void testFireDeadline() throws Exception {
- BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10));
-
- assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
- AfterAll.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(1))
- .getWatermarkThatGuaranteesFiring(window));
- }
-
- @Test
- public void testContinuation() throws Exception {
- OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane();
- OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow();
- Trigger afterAll = AfterAll.of(trigger1, trigger2);
- assertEquals(
- AfterAll.of(trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger()),
- afterAll.getContinuationTrigger());
- }
-
- @Test
- public void testToString() {
- Trigger trigger = AfterAll.of(StubTrigger.named("t1"), StubTrigger.named("t2"));
- assertEquals("AfterAll.of(t1, t2)", trigger.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterEachTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterEachTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterEachTest.java
deleted file mode 100644
index c413c6e..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterEachTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link AfterEach}.
- */
-@RunWith(JUnit4.class)
-public class AfterEachTest {
-
- private SimpleTriggerTester<IntervalWindow> tester;
-
- @Before
- public void initMocks() {
- MockitoAnnotations.initMocks(this);
- }
-
- /**
- * Tests that the {@link AfterEach} trigger fires and finishes the first trigger then the second.
- */
- @Test
- public void testAfterEachInSequence() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterEach.inOrder(
- Repeatedly.forever(AfterPane.elementCountAtLeast(2))
- .orFinally(AfterPane.elementCountAtLeast(3)),
- Repeatedly.forever(AfterPane.elementCountAtLeast(5))
- .orFinally(AfterWatermark.pastEndOfWindow())),
- FixedWindows.of(Duration.millis(10)));
-
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
-
- // AfterCount(2) not ready
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
-
- // AfterCount(2) ready, not finished
- tester.injectElements(2);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
-
- // orFinally(AfterCount(3)) ready and will finish the first
- tester.injectElements(1, 2, 3);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
-
- // Now running as the second trigger
- assertFalse(tester.shouldFire(window));
- // This quantity of elements would fire and finish if it were erroneously still the first
- tester.injectElements(1, 2, 3, 4);
- assertFalse(tester.shouldFire(window));
-
- // Now fire once
- tester.injectElements(5);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
-
- // This time advance the watermark to finish the whole mess.
- tester.advanceInputWatermark(new Instant(10));
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertTrue(tester.isMarkedFinished(window));
- }
-
- @Test
- public void testFireDeadline() throws Exception {
- BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10));
-
- assertEquals(new Instant(9),
- AfterEach.inOrder(AfterWatermark.pastEndOfWindow(),
- AfterPane.elementCountAtLeast(4))
- .getWatermarkThatGuaranteesFiring(window));
-
- assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
- AfterEach.inOrder(AfterPane.elementCountAtLeast(2), AfterWatermark.pastEndOfWindow())
- .getWatermarkThatGuaranteesFiring(window));
- }
-
- @Test
- public void testContinuation() throws Exception {
- OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane();
- OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow();
- Trigger afterEach = AfterEach.inOrder(trigger1, trigger2);
- assertEquals(
- Repeatedly.forever(AfterFirst.of(
- trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger())),
- afterEach.getContinuationTrigger());
- }
-
- @Test
- public void testToString() {
- Trigger trigger = AfterEach.inOrder(
- StubTrigger.named("t1"),
- StubTrigger.named("t2"),
- StubTrigger.named("t3"));
-
- assertEquals("AfterEach.inOrder(t1, t2, t3)", trigger.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterFirstTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterFirstTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterFirstTest.java
deleted file mode 100644
index 415060b..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterFirstTest.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link AfterFirst}.
- */
-@RunWith(JUnit4.class)
-public class AfterFirstTest {
-
- @Mock private OnceTrigger mockTrigger1;
- @Mock private OnceTrigger mockTrigger2;
- private SimpleTriggerTester<IntervalWindow> tester;
- private static Trigger.TriggerContext anyTriggerContext() {
- return Mockito.<Trigger.TriggerContext>any();
- }
-
- @Before
- public void initMocks() {
- MockitoAnnotations.initMocks(this);
- }
-
- @Test
- public void testNeitherShouldFireFixedWindows() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10)));
-
- tester.injectElements(1);
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
-
- when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(false);
- when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false);
-
- assertFalse(tester.shouldFire(window)); // should not fire
- assertFalse(tester.isMarkedFinished(window)); // not finished
- }
-
- @Test
- public void testOnlyT1ShouldFireFixedWindows() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10)));
- tester.injectElements(1);
- IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11));
-
- when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true);
- when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false);
-
- assertTrue(tester.shouldFire(window)); // should fire
-
- tester.fireIfShouldFire(window);
- assertTrue(tester.isMarkedFinished(window));
- }
-
- @Test
- public void testOnlyT2ShouldFireFixedWindows() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10)));
- tester.injectElements(1);
- IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11));
-
- when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(false);
- when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(true);
- assertTrue(tester.shouldFire(window)); // should fire
-
- tester.fireIfShouldFire(window); // now finished
- assertTrue(tester.isMarkedFinished(window));
- }
-
- @Test
- public void testBothShouldFireFixedWindows() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10)));
- tester.injectElements(1);
- IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11));
-
- when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true);
- when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(true);
- assertTrue(tester.shouldFire(window)); // should fire
-
- tester.fireIfShouldFire(window);
- assertTrue(tester.isMarkedFinished(window));
- }
-
- /**
- * Tests that if the first trigger rewinds to be non-finished in the merged window,
- * then it becomes the currently active trigger again, with real triggers.
- */
- @Test
- public void testShouldFireAfterMerge() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterEach.inOrder(
- AfterFirst.of(AfterPane.elementCountAtLeast(5),
- AfterWatermark.pastEndOfWindow()),
- Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
- Sessions.withGapDuration(Duration.millis(10)));
-
- // Finished the AfterFirst in the first window
- tester.injectElements(1);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- assertFalse(tester.shouldFire(firstWindow));
- tester.advanceInputWatermark(new Instant(11));
- assertTrue(tester.shouldFire(firstWindow));
- tester.fireIfShouldFire(firstWindow);
-
- // Set up second window where it is not done
- tester.injectElements(5);
- IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Merge them, if the merged window were on the second trigger, it would be ready
- tester.mergeWindows();
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
- assertFalse(tester.shouldFire(mergedWindow));
-
- // Now adding 3 more makes the AfterFirst ready to fire
- tester.injectElements(1, 2, 3, 4, 5);
- tester.mergeWindows();
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
- @Test
- public void testFireDeadline() throws Exception {
- BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10));
-
- assertEquals(new Instant(9),
- AfterFirst.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(4))
- .getWatermarkThatGuaranteesFiring(window));
- assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
- AfterFirst.of(AfterPane.elementCountAtLeast(2), AfterPane.elementCountAtLeast(1))
- .getWatermarkThatGuaranteesFiring(window));
- }
-
- @Test
- public void testContinuation() throws Exception {
- OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane();
- OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow();
- Trigger afterFirst = AfterFirst.of(trigger1, trigger2);
- assertEquals(
- AfterFirst.of(trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger()),
- afterFirst.getContinuationTrigger());
- }
-
- @Test
- public void testToString() {
- Trigger trigger = AfterFirst.of(StubTrigger.named("t1"), StubTrigger.named("t2"));
- assertEquals("AfterFirst.of(t1, t2)", trigger.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterPaneTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterPaneTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterPaneTest.java
deleted file mode 100644
index 38d030e..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterPaneTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link AfterPane}.
- */
-@RunWith(JUnit4.class)
-public class AfterPaneTest {
-
- SimpleTriggerTester<IntervalWindow> tester;
- /**
- * Tests that the trigger does fire when enough elements are in a window, and that it only
- * fires that window (no leakage).
- */
- @Test
- public void testAfterPaneElementCountFixedWindows() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterPane.elementCountAtLeast(2),
- FixedWindows.of(Duration.millis(10)));
-
- tester.injectElements(1); // [0, 10)
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
- assertFalse(tester.shouldFire(window));
-
- tester.injectElements(2); // [0, 10)
- tester.injectElements(11); // [10, 20)
-
- assertTrue(tester.shouldFire(window)); // ready to fire
- tester.fireIfShouldFire(window); // and finished
- assertTrue(tester.isMarkedFinished(window));
-
- // But don't finish the other window
- assertFalse(tester.isMarkedFinished(new IntervalWindow(new Instant(10), new Instant(20))));
- }
-
- @Test
- public void testClear() throws Exception {
- SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
- AfterPane.elementCountAtLeast(2),
- FixedWindows.of(Duration.millis(10)));
-
- tester.injectElements(1, 2, 3);
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
- tester.clearState(window);
- tester.assertCleared(window);
- }
-
- @Test
- public void testAfterPaneElementCountSessions() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterPane.elementCountAtLeast(2),
- Sessions.withGapDuration(Duration.millis(10)));
-
- tester.injectElements(
- 1, // in [1, 11)
- 2); // in [2, 12)
-
- assertFalse(tester.shouldFire(new IntervalWindow(new Instant(1), new Instant(11))));
- assertFalse(tester.shouldFire(new IntervalWindow(new Instant(2), new Instant(12))));
-
- tester.mergeWindows();
-
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(12));
- assertTrue(tester.shouldFire(mergedWindow));
- tester.fireIfShouldFire(mergedWindow);
- assertTrue(tester.isMarkedFinished(mergedWindow));
-
- // Because we closed the previous window, we don't have it around to merge with. So there
- // will be a new FIRE_AND_FINISH result.
- tester.injectElements(
- 7, // in [7, 17)
- 9); // in [9, 19)
-
- tester.mergeWindows();
-
- IntervalWindow newMergedWindow = new IntervalWindow(new Instant(7), new Instant(19));
- assertTrue(tester.shouldFire(newMergedWindow));
- tester.fireIfShouldFire(newMergedWindow);
- assertTrue(tester.isMarkedFinished(newMergedWindow));
- }
-
- @Test
- public void testFireDeadline() throws Exception {
- assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
- AfterPane.elementCountAtLeast(1).getWatermarkThatGuaranteesFiring(
- new IntervalWindow(new Instant(0), new Instant(10))));
- }
-
- @Test
- public void testContinuation() throws Exception {
- assertEquals(
- AfterPane.elementCountAtLeast(1),
- AfterPane.elementCountAtLeast(100).getContinuationTrigger());
- assertEquals(
- AfterPane.elementCountAtLeast(1),
- AfterPane.elementCountAtLeast(100).getContinuationTrigger().getContinuationTrigger());
- }
-
- @Test
- public void testToString() {
- Trigger trigger = AfterPane.elementCountAtLeast(5);
- assertEquals("AfterPane.elementCountAtLeast(5)", trigger.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterProcessingTimeTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterProcessingTimeTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterProcessingTimeTest.java
deleted file mode 100644
index 13a7acf..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterProcessingTimeTest.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests the {@link AfterProcessingTime}.
- */
-@RunWith(JUnit4.class)
-public class AfterProcessingTimeTest {
-
- /**
- * Tests the basic property that the trigger does wait for processing time to be
- * far enough advanced.
- */
- @Test
- public void testAfterProcessingTimeFixedWindows() throws Exception {
- Duration windowDuration = Duration.millis(10);
- SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
- AfterProcessingTime
- .pastFirstElementInPane()
- .plusDelayOf(Duration.millis(5)),
- FixedWindows.of(windowDuration));
-
- tester.advanceProcessingTime(new Instant(10));
-
- // Timer at 15
- tester.injectElements(1);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(10));
- tester.advanceProcessingTime(new Instant(12));
- assertFalse(tester.shouldFire(firstWindow));
-
- // Load up elements in the next window, timer at 17 for them
- tester.injectElements(11, 12, 13);
- IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(20));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Not quite time to fire
- tester.advanceProcessingTime(new Instant(14));
- assertFalse(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Timer at 19 for these in the first window; it should be ignored since the 15 will fire first
- tester.injectElements(2, 3);
-
- // Advance past the first timer and fire, finishing the first window
- tester.advanceProcessingTime(new Instant(16));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
- assertTrue(tester.isMarkedFinished(firstWindow));
-
- // The next window fires and finishes now
- tester.advanceProcessingTime(new Instant(18));
- assertTrue(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(secondWindow);
- assertTrue(tester.isMarkedFinished(secondWindow));
- }
-
- /**
- * Tests that when windows merge, if the trigger is waiting for "N millis after the first
- * element" that it is relative to the earlier of the two merged windows.
- */
- @Test
- public void testClear() throws Exception {
- SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
- AfterProcessingTime
- .pastFirstElementInPane()
- .plusDelayOf(Duration.millis(5)),
- FixedWindows.of(Duration.millis(10)));
-
- tester.injectElements(1, 2, 3);
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
- tester.clearState(window);
- tester.assertCleared(window);
- }
-
- @Test
- public void testAfterProcessingTimeWithMergingWindow() throws Exception {
- SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
- AfterProcessingTime
- .pastFirstElementInPane()
- .plusDelayOf(Duration.millis(5)),
- Sessions.withGapDuration(Duration.millis(10)));
-
- tester.advanceProcessingTime(new Instant(10));
- tester.injectElements(1); // in [1, 11), timer for 15
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- assertFalse(tester.shouldFire(firstWindow));
-
- tester.advanceProcessingTime(new Instant(12));
- tester.injectElements(3); // in [3, 13), timer for 17
- IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new Instant(13));
- assertFalse(tester.shouldFire(secondWindow));
-
- tester.mergeWindows();
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(13));
-
- tester.advanceProcessingTime(new Instant(16));
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
- @Test
- public void testFireDeadline() throws Exception {
- assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
- AfterProcessingTime.pastFirstElementInPane().getWatermarkThatGuaranteesFiring(
- new IntervalWindow(new Instant(0), new Instant(10))));
- }
-
- @Test
- public void testContinuation() throws Exception {
- OnceTrigger firstElementPlus1 =
- AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1));
- assertEquals(
- new AfterSynchronizedProcessingTime(),
- firstElementPlus1.getContinuationTrigger());
- }
-
- /**
- * Basic test of compatibility check between identical triggers.
- */
- @Test
- public void testCompatibilityIdentical() throws Exception {
- Trigger t1 = AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(Duration.standardMinutes(1L));
- Trigger t2 = AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(Duration.standardMinutes(1L));
- assertTrue(t1.isCompatible(t2));
- }
-
- @Test
- public void testToString() {
- Trigger trigger = AfterProcessingTime.pastFirstElementInPane();
- assertEquals("AfterProcessingTime.pastFirstElementInPane()", trigger.toString());
- }
-
- @Test
- public void testWithDelayToString() {
- Trigger trigger = AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(Duration.standardMinutes(5));
-
- assertEquals("AfterProcessingTime.pastFirstElementInPane().plusDelayOf(5 minutes)",
- trigger.toString());
- }
-
- @Test
- public void testBuiltUpToString() {
- Trigger trigger = AfterWatermark.pastEndOfWindow()
- .withLateFirings(AfterProcessingTime
- .pastFirstElementInPane()
- .plusDelayOf(Duration.standardMinutes(10)));
-
- String expected = "AfterWatermark.pastEndOfWindow()"
- + ".withLateFirings(AfterProcessingTime"
- + ".pastFirstElementInPane()"
- + ".plusDelayOf(10 minutes))";
-
- assertEquals(expected, trigger.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTimeTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTimeTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTimeTest.java
deleted file mode 100644
index 7e6e938..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTimeTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests the {@link AfterSynchronizedProcessingTime}.
- */
-@RunWith(JUnit4.class)
-public class AfterSynchronizedProcessingTimeTest {
-
- private Trigger underTest = new AfterSynchronizedProcessingTime();
-
- @Test
- public void testAfterProcessingTimeWithFixedWindows() throws Exception {
- Duration windowDuration = Duration.millis(10);
- SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
- AfterProcessingTime
- .pastFirstElementInPane()
- .plusDelayOf(Duration.millis(5)),
- FixedWindows.of(windowDuration));
-
- tester.advanceProcessingTime(new Instant(10));
-
- // Timer at 15
- tester.injectElements(1);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(10));
- tester.advanceProcessingTime(new Instant(12));
- assertFalse(tester.shouldFire(firstWindow));
-
- // Load up elements in the next window, timer at 17 for them
- tester.injectElements(11, 12, 13);
- IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(20));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Not quite time to fire
- tester.advanceProcessingTime(new Instant(14));
- assertFalse(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Timer at 19 for these in the first window; it should be ignored since the 15 will fire first
- tester.injectElements(2, 3);
-
- // Advance past the first timer and fire, finishing the first window
- tester.advanceProcessingTime(new Instant(16));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
- assertTrue(tester.isMarkedFinished(firstWindow));
-
- // The next window fires and finishes now
- tester.advanceProcessingTime(new Instant(18));
- assertTrue(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(secondWindow);
- assertTrue(tester.isMarkedFinished(secondWindow));
- }
-
- @Test
- public void testAfterProcessingTimeWithMergingWindow() throws Exception {
- Duration windowDuration = Duration.millis(10);
- SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
- AfterProcessingTime
- .pastFirstElementInPane()
- .plusDelayOf(Duration.millis(5)),
- Sessions.withGapDuration(windowDuration));
-
- tester.advanceProcessingTime(new Instant(10));
- tester.injectElements(1); // in [1, 11), timer for 15
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- assertFalse(tester.shouldFire(firstWindow));
-
- tester.advanceProcessingTime(new Instant(12));
- tester.injectElements(3); // in [3, 13), timer for 17
- IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new Instant(13));
- assertFalse(tester.shouldFire(secondWindow));
-
- tester.mergeWindows();
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(13));
-
- tester.advanceProcessingTime(new Instant(16));
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
- @Test
- public void testFireDeadline() throws Exception {
- assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
- underTest.getWatermarkThatGuaranteesFiring(
- new IntervalWindow(new Instant(0), new Instant(10))));
- }
-
- @Test
- public void testContinuation() throws Exception {
- assertEquals(underTest, underTest.getContinuationTrigger());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterWatermarkTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterWatermarkTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterWatermarkTest.java
deleted file mode 100644
index 084027b..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/AfterWatermarkTest.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests the {@link AfterWatermark} triggers.
- */
-@RunWith(JUnit4.class)
-public class AfterWatermarkTest {
-
- @Mock private OnceTrigger mockEarly;
- @Mock private OnceTrigger mockLate;
-
- private SimpleTriggerTester<IntervalWindow> tester;
- private static Trigger.TriggerContext anyTriggerContext() {
- return Mockito.<Trigger.TriggerContext>any();
- }
- private static Trigger.OnElementContext anyElementContext() {
- return Mockito.<Trigger.OnElementContext>any();
- }
-
- private void injectElements(int... elements) throws Exception {
- for (int element : elements) {
- doNothing().when(mockEarly).onElement(anyElementContext());
- doNothing().when(mockLate).onElement(anyElementContext());
- tester.injectElements(element);
- }
- }
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
- }
-
- public void testRunningAsTrigger(OnceTrigger mockTrigger, IntervalWindow window)
- throws Exception {
-
- // Don't fire due to mock saying no
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
- assertFalse(tester.shouldFire(window)); // not ready
-
- // Fire due to mock trigger; early trigger is required to be a OnceTrigger
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- assertTrue(tester.shouldFire(window)); // ready
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
- }
-
- @Test
- public void testEarlyAndAtWatermark() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterWatermark.pastEndOfWindow()
- .withEarlyFirings(mockEarly),
- FixedWindows.of(Duration.millis(100)));
-
- injectElements(1);
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
-
- testRunningAsTrigger(mockEarly, window);
-
- // Fire due to watermark
- when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
- tester.advanceInputWatermark(new Instant(100));
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertTrue(tester.isMarkedFinished(window));
- }
-
- @Test
- public void testAtWatermarkAndLate() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterWatermark.pastEndOfWindow()
- .withLateFirings(mockLate),
- FixedWindows.of(Duration.millis(100)));
-
- injectElements(1);
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
-
- // No early firing, just double checking
- when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(true);
- assertFalse(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
-
- // Fire due to watermark
- when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
- tester.advanceInputWatermark(new Instant(100));
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
-
- testRunningAsTrigger(mockLate, window);
- }
-
- @Test
- public void testEarlyAndAtWatermarkAndLate() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterWatermark.pastEndOfWindow()
- .withEarlyFirings(mockEarly)
- .withLateFirings(mockLate),
- FixedWindows.of(Duration.millis(100)));
-
- injectElements(1);
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
-
- testRunningAsTrigger(mockEarly, window);
-
- // Fire due to watermark
- when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
- tester.advanceInputWatermark(new Instant(100));
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
-
- testRunningAsTrigger(mockLate, window);
- }
-
- /**
- * Tests that if the EOW is finished in both as well as the merged window, then
- * it is finished in the merged result.
- *
- * <p>Because windows are discarded when a trigger finishes, we need to embed this
- * in a sequence in order to check that it is re-activated. So this test is potentially
- * sensitive to other triggers' correctness.
- */
- @Test
- public void testOnMergeAlreadyFinished() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterEach.inOrder(
- AfterWatermark.pastEndOfWindow(),
- Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
- Sessions.withGapDuration(Duration.millis(10)));
-
- tester.injectElements(1);
- tester.injectElements(5);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
-
- // Finish the AfterWatermark.pastEndOfWindow() trigger in both windows
- tester.advanceInputWatermark(new Instant(15));
- assertTrue(tester.shouldFire(firstWindow));
- assertTrue(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
- tester.fireIfShouldFire(secondWindow);
-
- // Confirm that we are on the second trigger by probing
- assertFalse(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- tester.injectElements(1);
- tester.injectElements(5);
- assertTrue(tester.shouldFire(firstWindow));
- assertTrue(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
- tester.fireIfShouldFire(secondWindow);
-
- // Merging should leave it finished
- tester.mergeWindows();
-
- // Confirm that we are on the second trigger by probing
- assertFalse(tester.shouldFire(mergedWindow));
- tester.injectElements(1);
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
- /**
- * Tests that the trigger rewinds to be non-finished in the merged window.
- *
- * <p>Because windows are discarded when a trigger finishes, we need to embed this
- * in a sequence in order to check that it is re-activated. So this test is potentially
- * sensitive to other triggers' correctness.
- */
- @Test
- public void testOnMergeRewinds() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterEach.inOrder(
- AfterWatermark.pastEndOfWindow(),
- Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
- Sessions.withGapDuration(Duration.millis(10)));
-
- tester.injectElements(1);
- tester.injectElements(5);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
-
- // Finish the AfterWatermark.pastEndOfWindow() trigger in only the first window
- tester.advanceInputWatermark(new Instant(11));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
-
- // Confirm that we are on the second trigger by probing
- assertFalse(tester.shouldFire(firstWindow));
- tester.injectElements(1);
- assertTrue(tester.shouldFire(firstWindow));
- tester.fireIfShouldFire(firstWindow);
-
- // Merging should re-activate the watermark trigger in the merged window
- tester.mergeWindows();
-
- // Confirm that we are not on the second trigger by probing
- assertFalse(tester.shouldFire(mergedWindow));
- tester.injectElements(1);
- assertFalse(tester.shouldFire(mergedWindow));
-
- // And confirm that advancing the watermark fires again
- tester.advanceInputWatermark(new Instant(15));
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
- /**
- * Tests that if the EOW is finished in both as well as the merged window, then
- * it is finished in the merged result.
- *
- * <p>Because windows are discarded when a trigger finishes, we need to embed this
- * in a sequence in order to check that it is re-activated. So this test is potentially
- * sensitive to other triggers' correctness.
- */
- @Test
- public void testEarlyAndLateOnMergeAlreadyFinished() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterWatermark.pastEndOfWindow()
- .withEarlyFirings(AfterPane.elementCountAtLeast(100))
- .withLateFirings(AfterPane.elementCountAtLeast(1)),
- Sessions.withGapDuration(Duration.millis(10)));
-
- tester.injectElements(1);
- tester.injectElements(5);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
-
- // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in both windows
- tester.advanceInputWatermark(new Instant(15));
- assertTrue(tester.shouldFire(firstWindow));
- assertTrue(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
- tester.fireIfShouldFire(secondWindow);
-
- // Confirm that we are on the late trigger by probing
- assertFalse(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- tester.injectElements(1);
- tester.injectElements(5);
- assertTrue(tester.shouldFire(firstWindow));
- assertTrue(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
- tester.fireIfShouldFire(secondWindow);
-
- // Merging should leave it on the late trigger
- tester.mergeWindows();
-
- // Confirm that we are on the late trigger by probing
- assertFalse(tester.shouldFire(mergedWindow));
- tester.injectElements(1);
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
- /**
- * Tests that the trigger rewinds to be non-finished in the merged window.
- *
- * <p>Because windows are discarded when a trigger finishes, we need to embed this
- * in a sequence in order to check that it is re-activated. So this test is potentially
- * sensitive to other triggers' correctness.
- */
- @Test
- public void testEarlyAndLateOnMergeRewinds() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterWatermark.pastEndOfWindow()
- .withEarlyFirings(AfterPane.elementCountAtLeast(100))
- .withLateFirings(AfterPane.elementCountAtLeast(1)),
- Sessions.withGapDuration(Duration.millis(10)));
-
- tester.injectElements(1);
- tester.injectElements(5);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
-
- // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in only the first window
- tester.advanceInputWatermark(new Instant(11));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
-
- // Confirm that we are on the late trigger by probing
- assertFalse(tester.shouldFire(firstWindow));
- tester.injectElements(1);
- assertTrue(tester.shouldFire(firstWindow));
- tester.fireIfShouldFire(firstWindow);
-
- // Merging should re-activate the early trigger in the merged window
- tester.mergeWindows();
-
- // Confirm that we are not on the second trigger by probing
- assertFalse(tester.shouldFire(mergedWindow));
- tester.injectElements(1);
- assertFalse(tester.shouldFire(mergedWindow));
-
- // And confirm that advancing the watermark fires again
- tester.advanceInputWatermark(new Instant(15));
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
- @Test
- public void testFromEndOfWindowToString() {
- Trigger trigger = AfterWatermark.pastEndOfWindow();
- assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString());
- }
-
- @Test
- public void testEarlyFiringsToString() {
- Trigger trigger = AfterWatermark.pastEndOfWindow().withEarlyFirings(StubTrigger.named("t1"));
-
- assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1)", trigger.toString());
- }
-
- @Test
- public void testLateFiringsToString() {
- Trigger trigger = AfterWatermark.pastEndOfWindow().withLateFirings(StubTrigger.named("t1"));
-
- assertEquals("AfterWatermark.pastEndOfWindow().withLateFirings(t1)", trigger.toString());
- }
-
- @Test
- public void testEarlyAndLateFiringsToString() {
- Trigger trigger =
- AfterWatermark.pastEndOfWindow()
- .withEarlyFirings(StubTrigger.named("t1"))
- .withLateFirings(StubTrigger.named("t2"));
-
- assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1).withLateFirings(t2)",
- trigger.toString());
- }
-
- @Test
- public void testToStringExcludesNeverTrigger() {
- Trigger trigger =
- AfterWatermark.pastEndOfWindow()
- .withEarlyFirings(Never.ever())
- .withLateFirings(Never.ever());
-
- assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/DefaultTriggerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/DefaultTriggerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/DefaultTriggerTest.java
deleted file mode 100644
index 673e555..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/DefaultTriggerTest.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests the {@link DefaultTrigger}, which should be equivalent to
- * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
- */
-@RunWith(JUnit4.class)
-public class DefaultTriggerTest {
-
- SimpleTriggerTester<IntervalWindow> tester;
-
- @Test
- public void testDefaultTriggerFixedWindows() throws Exception {
- tester = TriggerTester.forTrigger(
- DefaultTrigger.of(),
- FixedWindows.of(Duration.millis(100)));
-
- tester.injectElements(
- 1, // [0, 100)
- 101); // [100, 200)
-
- IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(100));
- IntervalWindow secondWindow = new IntervalWindow(new Instant(100), new Instant(200));
-
- // Advance the watermark almost to the end of the first window.
- tester.advanceInputWatermark(new Instant(99));
- assertFalse(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Advance watermark past end of the first window, which is then ready
- tester.advanceInputWatermark(new Instant(100));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Fire, but the first window is still allowed to fire
- tester.fireIfShouldFire(firstWindow);
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Advance watermark to 200, then both are ready
- tester.advanceInputWatermark(new Instant(200));
- assertTrue(tester.shouldFire(firstWindow));
- assertTrue(tester.shouldFire(secondWindow));
-
- assertFalse(tester.isMarkedFinished(firstWindow));
- assertFalse(tester.isMarkedFinished(secondWindow));
- }
-
- @Test
- public void testDefaultTriggerSlidingWindows() throws Exception {
- tester = TriggerTester.forTrigger(
- DefaultTrigger.of(),
- SlidingWindows.of(Duration.millis(100)).every(Duration.millis(50)));
-
- tester.injectElements(
- 1, // [-50, 50), [0, 100)
- 50); // [0, 100), [50, 150)
-
- IntervalWindow firstWindow = new IntervalWindow(new Instant(-50), new Instant(50));
- IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new Instant(100));
- IntervalWindow thirdWindow = new IntervalWindow(new Instant(50), new Instant(150));
-
- assertFalse(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- assertFalse(tester.shouldFire(thirdWindow));
-
- // At 50, the first becomes ready; it stays ready after firing
- tester.advanceInputWatermark(new Instant(50));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- assertFalse(tester.shouldFire(thirdWindow));
- tester.fireIfShouldFire(firstWindow);
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- assertFalse(tester.shouldFire(thirdWindow));
-
- // At 99, the first is still the only one ready
- tester.advanceInputWatermark(new Instant(99));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- assertFalse(tester.shouldFire(thirdWindow));
-
- // At 100, the first and second are ready
- tester.advanceInputWatermark(new Instant(100));
- assertTrue(tester.shouldFire(firstWindow));
- assertTrue(tester.shouldFire(secondWindow));
- assertFalse(tester.shouldFire(thirdWindow));
- tester.fireIfShouldFire(firstWindow);
-
- assertFalse(tester.isMarkedFinished(firstWindow));
- assertFalse(tester.isMarkedFinished(secondWindow));
- assertFalse(tester.isMarkedFinished(thirdWindow));
- }
-
- @Test
- public void testDefaultTriggerSessions() throws Exception {
- tester = TriggerTester.forTrigger(
- DefaultTrigger.of(),
- Sessions.withGapDuration(Duration.millis(100)));
-
- tester.injectElements(
- 1, // [1, 101)
- 50); // [50, 150)
- tester.mergeWindows();
-
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(101));
- IntervalWindow secondWindow = new IntervalWindow(new Instant(50), new Instant(150));
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(150));
-
- // Not ready in any window yet
- tester.advanceInputWatermark(new Instant(100));
- assertFalse(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- assertFalse(tester.shouldFire(mergedWindow));
-
- // The first window is "ready": the caller owns knowledge of which windows are merged away
- tester.advanceInputWatermark(new Instant(149));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- assertFalse(tester.shouldFire(mergedWindow));
-
- // Now ready on all windows
- tester.advanceInputWatermark(new Instant(150));
- assertTrue(tester.shouldFire(firstWindow));
- assertTrue(tester.shouldFire(secondWindow));
- assertTrue(tester.shouldFire(mergedWindow));
-
- // Ensure it repeats
- tester.fireIfShouldFire(mergedWindow);
- assertTrue(tester.shouldFire(mergedWindow));
-
- assertFalse(tester.isMarkedFinished(mergedWindow));
- }
-
- @Test
- public void testFireDeadline() throws Exception {
- assertEquals(new Instant(9), DefaultTrigger.of().getWatermarkThatGuaranteesFiring(
- new IntervalWindow(new Instant(0), new Instant(10))));
- assertEquals(GlobalWindow.INSTANCE.maxTimestamp(),
- DefaultTrigger.of().getWatermarkThatGuaranteesFiring(GlobalWindow.INSTANCE));
- }
-
- @Test
- public void testContinuation() throws Exception {
- assertEquals(DefaultTrigger.of(), DefaultTrigger.of().getContinuationTrigger());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ExecutableTriggerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ExecutableTriggerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ExecutableTriggerTest.java
deleted file mode 100644
index 1e3a1ff..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ExecutableTriggerTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ExecutableTrigger}.
- */
-@RunWith(JUnit4.class)
-public class ExecutableTriggerTest {
-
- @Test
- public void testIndexAssignmentLeaf() throws Exception {
- StubTrigger t1 = new StubTrigger();
- ExecutableTrigger executable = ExecutableTrigger.create(t1);
- assertEquals(0, executable.getTriggerIndex());
- }
-
- @Test
- public void testIndexAssignmentOneLevel() throws Exception {
- StubTrigger t1 = new StubTrigger();
- StubTrigger t2 = new StubTrigger();
- StubTrigger t = new StubTrigger(t1, t2);
-
- ExecutableTrigger executable = ExecutableTrigger.create(t);
-
- assertEquals(0, executable.getTriggerIndex());
- assertEquals(1, executable.subTriggers().get(0).getTriggerIndex());
- assertSame(t1, executable.subTriggers().get(0).getSpec());
- assertEquals(2, executable.subTriggers().get(1).getTriggerIndex());
- assertSame(t2, executable.subTriggers().get(1).getSpec());
- }
-
- @Test
- public void testIndexAssignmentTwoLevel() throws Exception {
- StubTrigger t11 = new StubTrigger();
- StubTrigger t12 = new StubTrigger();
- StubTrigger t13 = new StubTrigger();
- StubTrigger t14 = new StubTrigger();
- StubTrigger t21 = new StubTrigger();
- StubTrigger t22 = new StubTrigger();
- StubTrigger t1 = new StubTrigger(t11, t12, t13, t14);
- StubTrigger t2 = new StubTrigger(t21, t22);
- StubTrigger t = new StubTrigger(t1, t2);
-
- ExecutableTrigger executable = ExecutableTrigger.create(t);
-
- assertEquals(0, executable.getTriggerIndex());
- assertEquals(1, executable.subTriggers().get(0).getTriggerIndex());
- assertEquals(6, executable.subTriggers().get(0).getFirstIndexAfterSubtree());
- assertEquals(6, executable.subTriggers().get(1).getTriggerIndex());
-
- assertSame(t1, executable.getSubTriggerContaining(1).getSpec());
- assertSame(t2, executable.getSubTriggerContaining(6).getSpec());
- assertSame(t1, executable.getSubTriggerContaining(2).getSpec());
- assertSame(t1, executable.getSubTriggerContaining(3).getSpec());
- assertSame(t1, executable.getSubTriggerContaining(5).getSpec());
- assertSame(t2, executable.getSubTriggerContaining(7).getSpec());
- }
-
- private static class StubTrigger extends Trigger {
-
- @SafeVarargs
- protected StubTrigger(Trigger... subTriggers) {
- super(Arrays.asList(subTriggers));
- }
-
- @Override
- public void onElement(OnElementContext c) throws Exception { }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception { }
-
- @Override
- public void clear(TriggerContext c) throws Exception {
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
- return BoundedWindow.TIMESTAMP_MAX_VALUE;
- }
-
- @Override
- public boolean isCompatible(Trigger other) {
- return false;
- }
-
- @Override
- public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
- return this;
- }
-
- @Override
- public boolean shouldFire(TriggerContext c) {
- return false;
- }
-
- @Override
- public void onFire(TriggerContext c) { }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSetTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSetTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSetTest.java
deleted file mode 100644
index 7f74620..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSetTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.theInstance;
-import static org.junit.Assert.assertThat;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link FinishedTriggersBitSet}.
- */
-@RunWith(JUnit4.class)
-public class FinishedTriggersBitSetTest {
- /**
- * Tests that after a trigger is set to finished, it reads back as finished.
- */
- @Test
- public void testSetGet() {
- FinishedTriggersProperties.verifyGetAfterSet(FinishedTriggersBitSet.emptyWithCapacity(1));
- }
-
- /**
- * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no
- * others.
- */
- @Test
- public void testClearRecursively() {
- FinishedTriggersProperties.verifyClearRecursively(FinishedTriggersBitSet.emptyWithCapacity(1));
- }
-
- @Test
- public void testCopy() throws Exception {
- FinishedTriggersBitSet finishedSet = FinishedTriggersBitSet.emptyWithCapacity(10);
- assertThat(finishedSet.copy().getBitSet(), not(theInstance(finishedSet.getBitSet())));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersProperties.java
deleted file mode 100644
index a66f74f..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersProperties.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.transforms.windowing.AfterAll;
-import org.apache.beam.sdk.transforms.windowing.AfterFirst;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-
-/**
- * Generalized tests for {@link FinishedTriggers} implementations.
- */
-public class FinishedTriggersProperties {
- /**
- * Tests that for the provided trigger and {@link FinishedTriggers}, when the trigger is set
- * finished, it is correctly reported as finished.
- */
- public static void verifyGetAfterSet(FinishedTriggers finishedSet, ExecutableTrigger trigger) {
- assertFalse(finishedSet.isFinished(trigger));
- finishedSet.setFinished(trigger, true);
- assertTrue(finishedSet.isFinished(trigger));
- }
-
- /**
- * For a few arbitrary triggers, tests that when the trigger is set finished it is correctly
- * reported as finished.
- */
- public static void verifyGetAfterSet(FinishedTriggers finishedSet) {
- ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of(
- AfterFirst.of(AfterPane.elementCountAtLeast(3), AfterWatermark.pastEndOfWindow()),
- AfterAll.of(
- AfterPane.elementCountAtLeast(10), AfterProcessingTime.pastFirstElementInPane())));
-
- verifyGetAfterSet(finishedSet, trigger);
- verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0).subTriggers().get(1));
- verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0));
- verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1));
- verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1).subTriggers().get(1));
- verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1).subTriggers().get(0));
- }
-
- /**
- * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no
- * others.
- */
- public static void verifyClearRecursively(FinishedTriggers finishedSet) {
- ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of(
- AfterFirst.of(AfterPane.elementCountAtLeast(3), AfterWatermark.pastEndOfWindow()),
- AfterAll.of(
- AfterPane.elementCountAtLeast(10), AfterProcessingTime.pastFirstElementInPane())));
-
- // Set them all finished. This method is not on a trigger as it makes no sense outside tests.
- setFinishedRecursively(finishedSet, trigger);
- assertTrue(finishedSet.isFinished(trigger));
- assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0)));
- assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(0)));
- assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(1)));
-
- // Clear just the second AfterAll
- finishedSet.clearRecursively(trigger.subTriggers().get(1));
-
- // Check that the first and all that are still finished
- assertTrue(finishedSet.isFinished(trigger));
- verifyFinishedRecursively(finishedSet, trigger.subTriggers().get(0));
- verifyUnfinishedRecursively(finishedSet, trigger.subTriggers().get(1));
- }
-
- private static void setFinishedRecursively(
- FinishedTriggers finishedSet, ExecutableTrigger trigger) {
- finishedSet.setFinished(trigger, true);
- for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
- setFinishedRecursively(finishedSet, subTrigger);
- }
- }
-
- private static void verifyFinishedRecursively(
- FinishedTriggers finishedSet, ExecutableTrigger trigger) {
- assertTrue(finishedSet.isFinished(trigger));
- for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
- verifyFinishedRecursively(finishedSet, subTrigger);
- }
- }
-
- private static void verifyUnfinishedRecursively(
- FinishedTriggers finishedSet, ExecutableTrigger trigger) {
- assertFalse(finishedSet.isFinished(trigger));
- for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
- verifyUnfinishedRecursively(finishedSet, subTrigger);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersSetTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersSetTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersSetTest.java
deleted file mode 100644
index 072d264..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/FinishedTriggersSetTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.theInstance;
-import static org.junit.Assert.assertThat;
-
-import java.util.HashSet;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link FinishedTriggersSet}.
- */
-@RunWith(JUnit4.class)
-public class FinishedTriggersSetTest {
- /**
- * Tests that after a trigger is set to finished, it reads back as finished.
- */
- @Test
- public void testSetGet() {
- FinishedTriggersProperties.verifyGetAfterSet(
- FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>()));
- }
-
- /**
- * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no
- * others.
- */
- @Test
- public void testClearRecursively() {
- FinishedTriggersProperties.verifyClearRecursively(
- FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>()));
- }
-
- @Test
- public void testCopy() throws Exception {
- FinishedTriggersSet finishedSet =
- FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>());
- assertThat(finishedSet.copy().getFinishedTriggers(),
- not(theInstance(finishedSet.getFinishedTriggers())));
- }
-}