You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/25 17:14:05 UTC
[3/5] incubator-beam git commit: Remove pieces of Trigger now owned
by TriggerStateMachine
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java
deleted file mode 100644
index e09aac2..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java
+++ /dev/null
@@ -1,507 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import java.util.Collection;
-import java.util.Map;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Trigger.MergingTriggerInfo;
-import org.apache.beam.sdk.transforms.windowing.Trigger.TriggerInfo;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.joda.time.Instant;
-
-/**
- * Factory for creating instances of the various {@link Trigger} contexts.
- *
- * <p>These contexts are highly interdependent and share many fields; it is inadvisable
- * to create them via any means other than this factory class.
- */
-public class TriggerContextFactory<W extends BoundedWindow> {
-
- private final WindowFn<?, W> windowFn;
- private StateInternals<?> stateInternals;
- private final Coder<W> windowCoder;
-
- public TriggerContextFactory(WindowFn<?, W> windowFn,
- StateInternals<?> stateInternals, ActiveWindowSet<W> activeWindows) {
- // Future triggers may be able to exploit the active window to state address window mapping.
- this.windowFn = windowFn;
- this.stateInternals = stateInternals;
- this.windowCoder = windowFn.windowCoder();
- }
-
- public Trigger.TriggerContext base(W window, Timers timers,
- ExecutableTrigger rootTrigger, FinishedTriggers finishedSet) {
- return new TriggerContextImpl(window, timers, rootTrigger, finishedSet);
- }
-
- public Trigger.OnElementContext createOnElementContext(
- W window, Timers timers, Instant elementTimestamp,
- ExecutableTrigger rootTrigger, FinishedTriggers finishedSet) {
- return new OnElementContextImpl(window, timers, rootTrigger, finishedSet, elementTimestamp);
- }
-
- public Trigger.OnMergeContext createOnMergeContext(W window, Timers timers,
- ExecutableTrigger rootTrigger, FinishedTriggers finishedSet,
- Map<W, FinishedTriggers> finishedSets) {
- return new OnMergeContextImpl(window, timers, rootTrigger, finishedSet, finishedSets);
- }
-
- public StateAccessor<?> createStateAccessor(W window, ExecutableTrigger trigger) {
- return new StateAccessorImpl(window, trigger);
- }
-
- public MergingStateAccessor<?, W> createMergingStateAccessor(
- W mergeResult, Collection<W> mergingWindows, ExecutableTrigger trigger) {
- return new MergingStateAccessorImpl(trigger, mergingWindows, mergeResult);
- }
-
- private class TriggerInfoImpl implements Trigger.TriggerInfo {
-
- protected final ExecutableTrigger trigger;
- protected final FinishedTriggers finishedSet;
- private final Trigger.TriggerContext context;
-
- public TriggerInfoImpl(ExecutableTrigger trigger, FinishedTriggers finishedSet,
- Trigger.TriggerContext context) {
- this.trigger = trigger;
- this.finishedSet = finishedSet;
- this.context = context;
- }
-
- @Override
- public boolean isMerging() {
- return !windowFn.isNonMerging();
- }
-
- @Override
- public Iterable<ExecutableTrigger> subTriggers() {
- return trigger.subTriggers();
- }
-
- @Override
- public ExecutableTrigger subTrigger(int subtriggerIndex) {
- return trigger.subTriggers().get(subtriggerIndex);
- }
-
- @Override
- public boolean isFinished() {
- return finishedSet.isFinished(trigger);
- }
-
- @Override
- public boolean isFinished(int subtriggerIndex) {
- return finishedSet.isFinished(subTrigger(subtriggerIndex));
- }
-
- @Override
- public boolean areAllSubtriggersFinished() {
- return Iterables.isEmpty(unfinishedSubTriggers());
- }
-
- @Override
- public Iterable<ExecutableTrigger> unfinishedSubTriggers() {
- return FluentIterable
- .from(trigger.subTriggers())
- .filter(new Predicate<ExecutableTrigger>() {
- @Override
- public boolean apply(ExecutableTrigger trigger) {
- return !finishedSet.isFinished(trigger);
- }
- });
- }
-
- @Override
- public ExecutableTrigger firstUnfinishedSubTrigger() {
- for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
- if (!finishedSet.isFinished(subTrigger)) {
- return subTrigger;
- }
- }
- return null;
- }
-
- @Override
- public void resetTree() throws Exception {
- finishedSet.clearRecursively(trigger);
- trigger.invokeClear(context);
- }
-
- @Override
- public void setFinished(boolean finished) {
- finishedSet.setFinished(trigger, finished);
- }
-
- @Override
- public void setFinished(boolean finished, int subTriggerIndex) {
- finishedSet.setFinished(subTrigger(subTriggerIndex), finished);
- }
- }
-
- private class TriggerTimers implements Timers {
-
- private final Timers timers;
- private final W window;
-
- public TriggerTimers(W window, Timers timers) {
- this.timers = timers;
- this.window = window;
- }
-
- @Override
- public void setTimer(Instant timestamp, TimeDomain timeDomain) {
- timers.setTimer(timestamp, timeDomain);
- }
-
- @Override
- public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
- if (timeDomain == TimeDomain.EVENT_TIME
- && timestamp.equals(window.maxTimestamp())) {
- // Don't allow triggers to unset the at-max-timestamp timer. This is necessary for on-time
- // state transitions.
- return;
- }
- timers.deleteTimer(timestamp, timeDomain);
- }
-
- @Override
- public Instant currentProcessingTime() {
- return timers.currentProcessingTime();
- }
-
- @Override
- @Nullable
- public Instant currentSynchronizedProcessingTime() {
- return timers.currentSynchronizedProcessingTime();
- }
-
- @Override
- public Instant currentEventTime() {
- return timers.currentEventTime();
- }
- }
-
- private class MergingTriggerInfoImpl
- extends TriggerInfoImpl implements Trigger.MergingTriggerInfo {
-
- private final Map<W, FinishedTriggers> finishedSets;
-
- public MergingTriggerInfoImpl(
- ExecutableTrigger trigger,
- FinishedTriggers finishedSet,
- Trigger.TriggerContext context,
- Map<W, FinishedTriggers> finishedSets) {
- super(trigger, finishedSet, context);
- this.finishedSets = finishedSets;
- }
-
- @Override
- public boolean finishedInAnyMergingWindow() {
- for (FinishedTriggers finishedSet : finishedSets.values()) {
- if (finishedSet.isFinished(trigger)) {
- return true;
- }
- }
- return false;
- }
-
- @Override
- public boolean finishedInAllMergingWindows() {
- for (FinishedTriggers finishedSet : finishedSets.values()) {
- if (!finishedSet.isFinished(trigger)) {
- return false;
- }
- }
- return true;
- }
- }
-
- private class StateAccessorImpl implements StateAccessor<Object> {
- protected final int triggerIndex;
- protected final StateNamespace windowNamespace;
-
- public StateAccessorImpl(
- W window,
- ExecutableTrigger trigger) {
- this.triggerIndex = trigger.getTriggerIndex();
- this.windowNamespace = namespaceFor(window);
- }
-
- protected StateNamespace namespaceFor(W window) {
- return StateNamespaces.windowAndTrigger(windowCoder, window, triggerIndex);
- }
-
- @Override
- public <StateT extends State> StateT access(StateTag<? super Object, StateT> address) {
- return stateInternals.state(windowNamespace, address);
- }
- }
-
- private class MergingStateAccessorImpl extends StateAccessorImpl
- implements MergingStateAccessor<Object, W> {
- private final Collection<W> activeToBeMerged;
-
- public MergingStateAccessorImpl(ExecutableTrigger trigger, Collection<W> activeToBeMerged,
- W mergeResult) {
- super(mergeResult, trigger);
- this.activeToBeMerged = activeToBeMerged;
- }
-
- @Override
- public <StateT extends State> StateT access(
- StateTag<? super Object, StateT> address) {
- return stateInternals.state(windowNamespace, address);
- }
-
- @Override
- public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
- StateTag<? super Object, StateT> address) {
- ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
- for (W mergingWindow : activeToBeMerged) {
- StateT stateForWindow = stateInternals.state(namespaceFor(mergingWindow), address);
- builder.put(mergingWindow, stateForWindow);
- }
- return builder.build();
- }
- }
-
- private class TriggerContextImpl extends Trigger.TriggerContext {
-
- private final W window;
- private final StateAccessorImpl state;
- private final Timers timers;
- private final TriggerInfoImpl triggerInfo;
-
- private TriggerContextImpl(
- W window,
- Timers timers,
- ExecutableTrigger trigger,
- FinishedTriggers finishedSet) {
- trigger.getSpec().super();
- this.window = window;
- this.state = new StateAccessorImpl(window, trigger);
- this.timers = new TriggerTimers(window, timers);
- this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this);
- }
-
- @Override
- public Trigger.TriggerContext forTrigger(ExecutableTrigger trigger) {
- return new TriggerContextImpl(window, timers, trigger, triggerInfo.finishedSet);
- }
-
- @Override
- public TriggerInfo trigger() {
- return triggerInfo;
- }
-
- @Override
- public StateAccessor<?> state() {
- return state;
- }
-
- @Override
- public W window() {
- return window;
- }
-
- @Override
- public void deleteTimer(Instant timestamp, TimeDomain domain) {
- timers.deleteTimer(timestamp, domain);
- }
-
- @Override
- public Instant currentProcessingTime() {
- return timers.currentProcessingTime();
- }
-
- @Override
- @Nullable
- public Instant currentSynchronizedProcessingTime() {
- return timers.currentSynchronizedProcessingTime();
- }
-
- @Override
- @Nullable
- public Instant currentEventTime() {
- return timers.currentEventTime();
- }
- }
-
- private class OnElementContextImpl extends Trigger.OnElementContext {
-
- private final W window;
- private final StateAccessorImpl state;
- private final Timers timers;
- private final TriggerInfoImpl triggerInfo;
- private final Instant eventTimestamp;
-
- private OnElementContextImpl(
- W window,
- Timers timers,
- ExecutableTrigger trigger,
- FinishedTriggers finishedSet,
- Instant eventTimestamp) {
- trigger.getSpec().super();
- this.window = window;
- this.state = new StateAccessorImpl(window, trigger);
- this.timers = new TriggerTimers(window, timers);
- this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this);
- this.eventTimestamp = eventTimestamp;
- }
-
-
- @Override
- public Instant eventTimestamp() {
- return eventTimestamp;
- }
-
- @Override
- public Trigger.OnElementContext forTrigger(ExecutableTrigger trigger) {
- return new OnElementContextImpl(
- window, timers, trigger, triggerInfo.finishedSet, eventTimestamp);
- }
-
- @Override
- public TriggerInfo trigger() {
- return triggerInfo;
- }
-
- @Override
- public StateAccessor<?> state() {
- return state;
- }
-
- @Override
- public W window() {
- return window;
- }
-
- @Override
- public void setTimer(Instant timestamp, TimeDomain domain) {
- timers.setTimer(timestamp, domain);
- }
-
-
- @Override
- public void deleteTimer(Instant timestamp, TimeDomain domain) {
- timers.deleteTimer(timestamp, domain);
- }
-
- @Override
- public Instant currentProcessingTime() {
- return timers.currentProcessingTime();
- }
-
- @Override
- @Nullable
- public Instant currentSynchronizedProcessingTime() {
- return timers.currentSynchronizedProcessingTime();
- }
-
- @Override
- @Nullable
- public Instant currentEventTime() {
- return timers.currentEventTime();
- }
- }
-
- private class OnMergeContextImpl extends Trigger.OnMergeContext {
- private final MergingStateAccessor<?, W> state;
- private final W window;
- private final Collection<W> mergingWindows;
- private final Timers timers;
- private final MergingTriggerInfoImpl triggerInfo;
-
- private OnMergeContextImpl(
- W window,
- Timers timers,
- ExecutableTrigger trigger,
- FinishedTriggers finishedSet,
- Map<W, FinishedTriggers> finishedSets) {
- trigger.getSpec().super();
- this.mergingWindows = finishedSets.keySet();
- this.window = window;
- this.state = new MergingStateAccessorImpl(trigger, mergingWindows, window);
- this.timers = new TriggerTimers(window, timers);
- this.triggerInfo = new MergingTriggerInfoImpl(trigger, finishedSet, this, finishedSets);
- }
-
- @Override
- public Trigger.OnMergeContext forTrigger(ExecutableTrigger trigger) {
- return new OnMergeContextImpl(
- window, timers, trigger, triggerInfo.finishedSet, triggerInfo.finishedSets);
- }
-
- @Override
- public MergingStateAccessor<?, W> state() {
- return state;
- }
-
- @Override
- public MergingTriggerInfo trigger() {
- return triggerInfo;
- }
-
- @Override
- public W window() {
- return window;
- }
-
- @Override
- public void setTimer(Instant timestamp, TimeDomain domain) {
- timers.setTimer(timestamp, domain);
- }
-
- @Override
- public void deleteTimer(Instant timestamp, TimeDomain domain) {
- timers.setTimer(timestamp, domain);
-
- }
-
- @Override
- public Instant currentProcessingTime() {
- return timers.currentProcessingTime();
- }
-
- @Override
- @Nullable
- public Instant currentSynchronizedProcessingTime() {
- return timers.currentSynchronizedProcessingTime();
- }
-
- @Override
- @Nullable
- public Instant currentEventTime() {
- return timers.currentEventTime();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java
index b591229..5b213c7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java
@@ -18,13 +18,8 @@
package org.apache.beam.sdk.transforms.windowing;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -36,99 +31,6 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class AfterAllTest {
- private SimpleTriggerTester<IntervalWindow> tester;
-
- @Test
- public void testT1FiresFirst() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterAll.of(
- AfterPane.elementCountAtLeast(1),
- AfterPane.elementCountAtLeast(2)),
- FixedWindows.of(Duration.millis(100)));
-
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
-
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
-
- tester.injectElements(2);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertTrue(tester.isMarkedFinished(window));
- }
-
- @Test
- public void testT2FiresFirst() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterAll.of(
- AfterPane.elementCountAtLeast(2),
- AfterPane.elementCountAtLeast(1)),
- FixedWindows.of(Duration.millis(100)));
-
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
-
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
-
- tester.injectElements(2);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertTrue(tester.isMarkedFinished(window));
- }
-
- /**
- * Tests that the AfterAll properly unsets finished bits when a merge causing it to become
- * unfinished.
- */
- @Test
- public void testOnMergeRewinds() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterEach.inOrder(
- AfterAll.of(
- AfterWatermark.pastEndOfWindow(),
- AfterPane.elementCountAtLeast(1)),
- Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
- Sessions.withGapDuration(Duration.millis(10)));
-
- tester.injectElements(1);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
-
- tester.injectElements(5);
- IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
-
- // Finish the AfterAll in the first window
- tester.advanceInputWatermark(new Instant(11));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
-
- // Merge them; the AfterAll should not be finished
- tester.mergeWindows();
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
- assertFalse(tester.isMarkedFinished(mergedWindow));
-
- // Confirm that we are back on the first trigger by probing that it is not ready to fire
- // after an element (with merging)
- tester.injectElements(3);
- tester.mergeWindows();
- assertFalse(tester.shouldFire(mergedWindow));
-
- // Fire the AfterAll in the merged window
- tester.advanceInputWatermark(new Instant(15));
- assertTrue(tester.shouldFire(mergedWindow));
- tester.fireIfShouldFire(mergedWindow);
-
- // Confirm that we are on the second trigger by probing
- tester.injectElements(2);
- tester.mergeWindows();
- assertTrue(tester.shouldFire(mergedWindow));
- tester.fireIfShouldFire(mergedWindow);
- tester.injectElements(2);
- tester.mergeWindows();
- assertTrue(tester.shouldFire(mergedWindow));
- tester.fireIfShouldFire(mergedWindow);
- }
-
@Test
public void testFireDeadline() throws Exception {
BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java
index c413c6e..00d25e0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java
@@ -18,19 +18,12 @@
package org.apache.beam.sdk.transforms.windowing;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
import org.joda.time.Instant;
-import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import org.mockito.MockitoAnnotations;
/**
* Tests for {@link AfterEach}.
@@ -38,63 +31,6 @@ import org.mockito.MockitoAnnotations;
@RunWith(JUnit4.class)
public class AfterEachTest {
- private SimpleTriggerTester<IntervalWindow> tester;
-
- @Before
- public void initMocks() {
- MockitoAnnotations.initMocks(this);
- }
-
- /**
- * Tests that the {@link AfterEach} trigger fires and finishes the first trigger then the second.
- */
- @Test
- public void testAfterEachInSequence() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterEach.inOrder(
- Repeatedly.forever(AfterPane.elementCountAtLeast(2))
- .orFinally(AfterPane.elementCountAtLeast(3)),
- Repeatedly.forever(AfterPane.elementCountAtLeast(5))
- .orFinally(AfterWatermark.pastEndOfWindow())),
- FixedWindows.of(Duration.millis(10)));
-
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
-
- // AfterCount(2) not ready
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
-
- // AfterCount(2) ready, not finished
- tester.injectElements(2);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
-
- // orFinally(AfterCount(3)) ready and will finish the first
- tester.injectElements(1, 2, 3);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
-
- // Now running as the second trigger
- assertFalse(tester.shouldFire(window));
- // This quantity of elements would fire and finish if it were erroneously still the first
- tester.injectElements(1, 2, 3, 4);
- assertFalse(tester.shouldFire(window));
-
- // Now fire once
- tester.injectElements(5);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
-
- // This time advance the watermark to finish the whole mess.
- tester.advanceInputWatermark(new Instant(10));
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertTrue(tester.isMarkedFinished(window));
- }
-
@Test
public void testFireDeadline() throws Exception {
BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java
index 415060b..2887edb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java
@@ -18,22 +18,12 @@
package org.apache.beam.sdk.transforms.windowing;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.when;
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
import org.joda.time.Instant;
-import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
/**
* Tests for {@link AfterFirst}.
@@ -41,116 +31,6 @@ import org.mockito.MockitoAnnotations;
@RunWith(JUnit4.class)
public class AfterFirstTest {
- @Mock private OnceTrigger mockTrigger1;
- @Mock private OnceTrigger mockTrigger2;
- private SimpleTriggerTester<IntervalWindow> tester;
- private static Trigger.TriggerContext anyTriggerContext() {
- return Mockito.<Trigger.TriggerContext>any();
- }
-
- @Before
- public void initMocks() {
- MockitoAnnotations.initMocks(this);
- }
-
- @Test
- public void testNeitherShouldFireFixedWindows() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10)));
-
- tester.injectElements(1);
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
-
- when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(false);
- when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false);
-
- assertFalse(tester.shouldFire(window)); // should not fire
- assertFalse(tester.isMarkedFinished(window)); // not finished
- }
-
- @Test
- public void testOnlyT1ShouldFireFixedWindows() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10)));
- tester.injectElements(1);
- IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11));
-
- when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true);
- when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false);
-
- assertTrue(tester.shouldFire(window)); // should fire
-
- tester.fireIfShouldFire(window);
- assertTrue(tester.isMarkedFinished(window));
- }
-
- @Test
- public void testOnlyT2ShouldFireFixedWindows() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10)));
- tester.injectElements(1);
- IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11));
-
- when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(false);
- when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(true);
- assertTrue(tester.shouldFire(window)); // should fire
-
- tester.fireIfShouldFire(window); // now finished
- assertTrue(tester.isMarkedFinished(window));
- }
-
- @Test
- public void testBothShouldFireFixedWindows() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10)));
- tester.injectElements(1);
- IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11));
-
- when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true);
- when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(true);
- assertTrue(tester.shouldFire(window)); // should fire
-
- tester.fireIfShouldFire(window);
- assertTrue(tester.isMarkedFinished(window));
- }
-
- /**
- * Tests that if the first trigger rewinds to be non-finished in the merged window,
- * then it becomes the currently active trigger again, with real triggers.
- */
- @Test
- public void testShouldFireAfterMerge() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterEach.inOrder(
- AfterFirst.of(AfterPane.elementCountAtLeast(5),
- AfterWatermark.pastEndOfWindow()),
- Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
- Sessions.withGapDuration(Duration.millis(10)));
-
- // Finished the AfterFirst in the first window
- tester.injectElements(1);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- assertFalse(tester.shouldFire(firstWindow));
- tester.advanceInputWatermark(new Instant(11));
- assertTrue(tester.shouldFire(firstWindow));
- tester.fireIfShouldFire(firstWindow);
-
- // Set up second window where it is not done
- tester.injectElements(5);
- IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Merge them, if the merged window were on the second trigger, it would be ready
- tester.mergeWindows();
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
- assertFalse(tester.shouldFire(mergedWindow));
-
- // Now adding 3 more makes the AfterFirst ready to fire
- tester.injectElements(1, 2, 3, 4, 5);
- tester.mergeWindows();
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
@Test
public void testFireDeadline() throws Exception {
BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java
index 38d030e..1bff80a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java
@@ -18,12 +18,7 @@
package org.apache.beam.sdk.transforms.windowing;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -35,78 +30,6 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class AfterPaneTest {
- SimpleTriggerTester<IntervalWindow> tester;
- /**
- * Tests that the trigger does fire when enough elements are in a window, and that it only
- * fires that window (no leakage).
- */
- @Test
- public void testAfterPaneElementCountFixedWindows() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterPane.elementCountAtLeast(2),
- FixedWindows.of(Duration.millis(10)));
-
- tester.injectElements(1); // [0, 10)
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
- assertFalse(tester.shouldFire(window));
-
- tester.injectElements(2); // [0, 10)
- tester.injectElements(11); // [10, 20)
-
- assertTrue(tester.shouldFire(window)); // ready to fire
- tester.fireIfShouldFire(window); // and finished
- assertTrue(tester.isMarkedFinished(window));
-
- // But don't finish the other window
- assertFalse(tester.isMarkedFinished(new IntervalWindow(new Instant(10), new Instant(20))));
- }
-
- @Test
- public void testClear() throws Exception {
- SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
- AfterPane.elementCountAtLeast(2),
- FixedWindows.of(Duration.millis(10)));
-
- tester.injectElements(1, 2, 3);
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
- tester.clearState(window);
- tester.assertCleared(window);
- }
-
- @Test
- public void testAfterPaneElementCountSessions() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterPane.elementCountAtLeast(2),
- Sessions.withGapDuration(Duration.millis(10)));
-
- tester.injectElements(
- 1, // in [1, 11)
- 2); // in [2, 12)
-
- assertFalse(tester.shouldFire(new IntervalWindow(new Instant(1), new Instant(11))));
- assertFalse(tester.shouldFire(new IntervalWindow(new Instant(2), new Instant(12))));
-
- tester.mergeWindows();
-
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(12));
- assertTrue(tester.shouldFire(mergedWindow));
- tester.fireIfShouldFire(mergedWindow);
- assertTrue(tester.isMarkedFinished(mergedWindow));
-
- // Because we closed the previous window, we don't have it around to merge with. So there
- // will be a new FIRE_AND_FINISH result.
- tester.injectElements(
- 7, // in [7, 17)
- 9); // in [9, 19)
-
- tester.mergeWindows();
-
- IntervalWindow newMergedWindow = new IntervalWindow(new Instant(7), new Instant(19));
- assertTrue(tester.shouldFire(newMergedWindow));
- tester.fireIfShouldFire(newMergedWindow);
- assertTrue(tester.isMarkedFinished(newMergedWindow));
- }
-
@Test
public void testFireDeadline() throws Exception {
assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java
index 13a7acf..4984d7c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java
@@ -18,12 +18,9 @@
package org.apache.beam.sdk.transforms.windowing;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
@@ -36,97 +33,6 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class AfterProcessingTimeTest {
- /**
- * Tests the basic property that the trigger does wait for processing time to be
- * far enough advanced.
- */
- @Test
- public void testAfterProcessingTimeFixedWindows() throws Exception {
- Duration windowDuration = Duration.millis(10);
- SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
- AfterProcessingTime
- .pastFirstElementInPane()
- .plusDelayOf(Duration.millis(5)),
- FixedWindows.of(windowDuration));
-
- tester.advanceProcessingTime(new Instant(10));
-
- // Timer at 15
- tester.injectElements(1);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(10));
- tester.advanceProcessingTime(new Instant(12));
- assertFalse(tester.shouldFire(firstWindow));
-
- // Load up elements in the next window, timer at 17 for them
- tester.injectElements(11, 12, 13);
- IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(20));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Not quite time to fire
- tester.advanceProcessingTime(new Instant(14));
- assertFalse(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Timer at 19 for these in the first window; it should be ignored since the 15 will fire first
- tester.injectElements(2, 3);
-
- // Advance past the first timer and fire, finishing the first window
- tester.advanceProcessingTime(new Instant(16));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
- assertTrue(tester.isMarkedFinished(firstWindow));
-
- // The next window fires and finishes now
- tester.advanceProcessingTime(new Instant(18));
- assertTrue(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(secondWindow);
- assertTrue(tester.isMarkedFinished(secondWindow));
- }
-
- /**
- * Tests that when windows merge, if the trigger is waiting for "N millis after the first
- * element" that it is relative to the earlier of the two merged windows.
- */
- @Test
- public void testClear() throws Exception {
- SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
- AfterProcessingTime
- .pastFirstElementInPane()
- .plusDelayOf(Duration.millis(5)),
- FixedWindows.of(Duration.millis(10)));
-
- tester.injectElements(1, 2, 3);
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
- tester.clearState(window);
- tester.assertCleared(window);
- }
-
- @Test
- public void testAfterProcessingTimeWithMergingWindow() throws Exception {
- SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
- AfterProcessingTime
- .pastFirstElementInPane()
- .plusDelayOf(Duration.millis(5)),
- Sessions.withGapDuration(Duration.millis(10)));
-
- tester.advanceProcessingTime(new Instant(10));
- tester.injectElements(1); // in [1, 11), timer for 15
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- assertFalse(tester.shouldFire(firstWindow));
-
- tester.advanceProcessingTime(new Instant(12));
- tester.injectElements(3); // in [3, 13), timer for 17
- IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new Instant(13));
- assertFalse(tester.shouldFire(secondWindow));
-
- tester.mergeWindows();
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(13));
-
- tester.advanceProcessingTime(new Instant(16));
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
@Test
public void testFireDeadline() throws Exception {
assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java
index 7e6e938..49d44c5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java
@@ -18,12 +18,7 @@
package org.apache.beam.sdk.transforms.windowing;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -38,76 +33,6 @@ public class AfterSynchronizedProcessingTimeTest {
private Trigger underTest = new AfterSynchronizedProcessingTime();
@Test
- public void testAfterProcessingTimeWithFixedWindows() throws Exception {
- Duration windowDuration = Duration.millis(10);
- SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
- AfterProcessingTime
- .pastFirstElementInPane()
- .plusDelayOf(Duration.millis(5)),
- FixedWindows.of(windowDuration));
-
- tester.advanceProcessingTime(new Instant(10));
-
- // Timer at 15
- tester.injectElements(1);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(10));
- tester.advanceProcessingTime(new Instant(12));
- assertFalse(tester.shouldFire(firstWindow));
-
- // Load up elements in the next window, timer at 17 for them
- tester.injectElements(11, 12, 13);
- IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(20));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Not quite time to fire
- tester.advanceProcessingTime(new Instant(14));
- assertFalse(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Timer at 19 for these in the first window; it should be ignored since the 15 will fire first
- tester.injectElements(2, 3);
-
- // Advance past the first timer and fire, finishing the first window
- tester.advanceProcessingTime(new Instant(16));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
- assertTrue(tester.isMarkedFinished(firstWindow));
-
- // The next window fires and finishes now
- tester.advanceProcessingTime(new Instant(18));
- assertTrue(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(secondWindow);
- assertTrue(tester.isMarkedFinished(secondWindow));
- }
-
- @Test
- public void testAfterProcessingTimeWithMergingWindow() throws Exception {
- Duration windowDuration = Duration.millis(10);
- SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
- AfterProcessingTime
- .pastFirstElementInPane()
- .plusDelayOf(Duration.millis(5)),
- Sessions.withGapDuration(windowDuration));
-
- tester.advanceProcessingTime(new Instant(10));
- tester.injectElements(1); // in [1, 11), timer for 15
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- assertFalse(tester.shouldFire(firstWindow));
-
- tester.advanceProcessingTime(new Instant(12));
- tester.injectElements(3); // in [3, 13), timer for 17
- IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new Instant(13));
- assertFalse(tester.shouldFire(secondWindow));
-
- tester.mergeWindows();
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(13));
-
- tester.advanceProcessingTime(new Instant(16));
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
- @Test
public void testFireDeadline() throws Exception {
assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
underTest.getWatermarkThatGuaranteesFiring(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java
index 084027b..a418d63 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java
@@ -18,23 +18,10 @@
package org.apache.beam.sdk.transforms.windowing;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.when;
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
/**
* Tests the {@link AfterWatermark} triggers.
@@ -42,301 +29,6 @@ import org.mockito.MockitoAnnotations;
@RunWith(JUnit4.class)
public class AfterWatermarkTest {
- @Mock private OnceTrigger mockEarly;
- @Mock private OnceTrigger mockLate;
-
- private SimpleTriggerTester<IntervalWindow> tester;
- private static Trigger.TriggerContext anyTriggerContext() {
- return Mockito.<Trigger.TriggerContext>any();
- }
- private static Trigger.OnElementContext anyElementContext() {
- return Mockito.<Trigger.OnElementContext>any();
- }
-
- private void injectElements(int... elements) throws Exception {
- for (int element : elements) {
- doNothing().when(mockEarly).onElement(anyElementContext());
- doNothing().when(mockLate).onElement(anyElementContext());
- tester.injectElements(element);
- }
- }
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
- }
-
- public void testRunningAsTrigger(OnceTrigger mockTrigger, IntervalWindow window)
- throws Exception {
-
- // Don't fire due to mock saying no
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
- assertFalse(tester.shouldFire(window)); // not ready
-
- // Fire due to mock trigger; early trigger is required to be a OnceTrigger
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- assertTrue(tester.shouldFire(window)); // ready
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
- }
-
- @Test
- public void testEarlyAndAtWatermark() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterWatermark.pastEndOfWindow()
- .withEarlyFirings(mockEarly),
- FixedWindows.of(Duration.millis(100)));
-
- injectElements(1);
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
-
- testRunningAsTrigger(mockEarly, window);
-
- // Fire due to watermark
- when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
- tester.advanceInputWatermark(new Instant(100));
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertTrue(tester.isMarkedFinished(window));
- }
-
- @Test
- public void testAtWatermarkAndLate() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterWatermark.pastEndOfWindow()
- .withLateFirings(mockLate),
- FixedWindows.of(Duration.millis(100)));
-
- injectElements(1);
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
-
- // No early firing, just double checking
- when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(true);
- assertFalse(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
-
- // Fire due to watermark
- when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
- tester.advanceInputWatermark(new Instant(100));
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
-
- testRunningAsTrigger(mockLate, window);
- }
-
- @Test
- public void testEarlyAndAtWatermarkAndLate() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterWatermark.pastEndOfWindow()
- .withEarlyFirings(mockEarly)
- .withLateFirings(mockLate),
- FixedWindows.of(Duration.millis(100)));
-
- injectElements(1);
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
-
- testRunningAsTrigger(mockEarly, window);
-
- // Fire due to watermark
- when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
- tester.advanceInputWatermark(new Instant(100));
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
-
- testRunningAsTrigger(mockLate, window);
- }
-
- /**
- * Tests that if the EOW is finished in both as well as the merged window, then
- * it is finished in the merged result.
- *
- * <p>Because windows are discarded when a trigger finishes, we need to embed this
- * in a sequence in order to check that it is re-activated. So this test is potentially
- * sensitive to other triggers' correctness.
- */
- @Test
- public void testOnMergeAlreadyFinished() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterEach.inOrder(
- AfterWatermark.pastEndOfWindow(),
- Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
- Sessions.withGapDuration(Duration.millis(10)));
-
- tester.injectElements(1);
- tester.injectElements(5);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
-
- // Finish the AfterWatermark.pastEndOfWindow() trigger in both windows
- tester.advanceInputWatermark(new Instant(15));
- assertTrue(tester.shouldFire(firstWindow));
- assertTrue(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
- tester.fireIfShouldFire(secondWindow);
-
- // Confirm that we are on the second trigger by probing
- assertFalse(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- tester.injectElements(1);
- tester.injectElements(5);
- assertTrue(tester.shouldFire(firstWindow));
- assertTrue(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
- tester.fireIfShouldFire(secondWindow);
-
- // Merging should leave it finished
- tester.mergeWindows();
-
- // Confirm that we are on the second trigger by probing
- assertFalse(tester.shouldFire(mergedWindow));
- tester.injectElements(1);
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
- /**
- * Tests that the trigger rewinds to be non-finished in the merged window.
- *
- * <p>Because windows are discarded when a trigger finishes, we need to embed this
- * in a sequence in order to check that it is re-activated. So this test is potentially
- * sensitive to other triggers' correctness.
- */
- @Test
- public void testOnMergeRewinds() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterEach.inOrder(
- AfterWatermark.pastEndOfWindow(),
- Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
- Sessions.withGapDuration(Duration.millis(10)));
-
- tester.injectElements(1);
- tester.injectElements(5);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
-
- // Finish the AfterWatermark.pastEndOfWindow() trigger in only the first window
- tester.advanceInputWatermark(new Instant(11));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
-
- // Confirm that we are on the second trigger by probing
- assertFalse(tester.shouldFire(firstWindow));
- tester.injectElements(1);
- assertTrue(tester.shouldFire(firstWindow));
- tester.fireIfShouldFire(firstWindow);
-
- // Merging should re-activate the watermark trigger in the merged window
- tester.mergeWindows();
-
- // Confirm that we are not on the second trigger by probing
- assertFalse(tester.shouldFire(mergedWindow));
- tester.injectElements(1);
- assertFalse(tester.shouldFire(mergedWindow));
-
- // And confirm that advancing the watermark fires again
- tester.advanceInputWatermark(new Instant(15));
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
- /**
- * Tests that if the EOW is finished in both as well as the merged window, then
- * it is finished in the merged result.
- *
- * <p>Because windows are discarded when a trigger finishes, we need to embed this
- * in a sequence in order to check that it is re-activated. So this test is potentially
- * sensitive to other triggers' correctness.
- */
- @Test
- public void testEarlyAndLateOnMergeAlreadyFinished() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterWatermark.pastEndOfWindow()
- .withEarlyFirings(AfterPane.elementCountAtLeast(100))
- .withLateFirings(AfterPane.elementCountAtLeast(1)),
- Sessions.withGapDuration(Duration.millis(10)));
-
- tester.injectElements(1);
- tester.injectElements(5);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
-
- // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in both windows
- tester.advanceInputWatermark(new Instant(15));
- assertTrue(tester.shouldFire(firstWindow));
- assertTrue(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
- tester.fireIfShouldFire(secondWindow);
-
- // Confirm that we are on the late trigger by probing
- assertFalse(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- tester.injectElements(1);
- tester.injectElements(5);
- assertTrue(tester.shouldFire(firstWindow));
- assertTrue(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
- tester.fireIfShouldFire(secondWindow);
-
- // Merging should leave it on the late trigger
- tester.mergeWindows();
-
- // Confirm that we are on the late trigger by probing
- assertFalse(tester.shouldFire(mergedWindow));
- tester.injectElements(1);
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
- /**
- * Tests that the trigger rewinds to be non-finished in the merged window.
- *
- * <p>Because windows are discarded when a trigger finishes, we need to embed this
- * in a sequence in order to check that it is re-activated. So this test is potentially
- * sensitive to other triggers' correctness.
- */
- @Test
- public void testEarlyAndLateOnMergeRewinds() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterWatermark.pastEndOfWindow()
- .withEarlyFirings(AfterPane.elementCountAtLeast(100))
- .withLateFirings(AfterPane.elementCountAtLeast(1)),
- Sessions.withGapDuration(Duration.millis(10)));
-
- tester.injectElements(1);
- tester.injectElements(5);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
-
- // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in only the first window
- tester.advanceInputWatermark(new Instant(11));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
-
- // Confirm that we are on the late trigger by probing
- assertFalse(tester.shouldFire(firstWindow));
- tester.injectElements(1);
- assertTrue(tester.shouldFire(firstWindow));
- tester.fireIfShouldFire(firstWindow);
-
- // Merging should re-activate the early trigger in the merged window
- tester.mergeWindows();
-
- // Confirm that we are not on the second trigger by probing
- assertFalse(tester.shouldFire(mergedWindow));
- tester.injectElements(1);
- assertFalse(tester.shouldFire(mergedWindow));
-
- // And confirm that advancing the watermark fires again
- tester.advanceInputWatermark(new Instant(15));
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
@Test
public void testFromEndOfWindowToString() {
Trigger trigger = AfterWatermark.pastEndOfWindow();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java
index 673e555..ee1c44a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java
@@ -18,12 +18,7 @@
package org.apache.beam.sdk.transforms.windowing;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -36,131 +31,6 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class DefaultTriggerTest {
- SimpleTriggerTester<IntervalWindow> tester;
-
- @Test
- public void testDefaultTriggerFixedWindows() throws Exception {
- tester = TriggerTester.forTrigger(
- DefaultTrigger.of(),
- FixedWindows.of(Duration.millis(100)));
-
- tester.injectElements(
- 1, // [0, 100)
- 101); // [100, 200)
-
- IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(100));
- IntervalWindow secondWindow = new IntervalWindow(new Instant(100), new Instant(200));
-
- // Advance the watermark almost to the end of the first window.
- tester.advanceInputWatermark(new Instant(99));
- assertFalse(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Advance watermark past end of the first window, which is then ready
- tester.advanceInputWatermark(new Instant(100));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Fire, but the first window is still allowed to fire
- tester.fireIfShouldFire(firstWindow);
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Advance watermark to 200, then both are ready
- tester.advanceInputWatermark(new Instant(200));
- assertTrue(tester.shouldFire(firstWindow));
- assertTrue(tester.shouldFire(secondWindow));
-
- assertFalse(tester.isMarkedFinished(firstWindow));
- assertFalse(tester.isMarkedFinished(secondWindow));
- }
-
- @Test
- public void testDefaultTriggerSlidingWindows() throws Exception {
- tester = TriggerTester.forTrigger(
- DefaultTrigger.of(),
- SlidingWindows.of(Duration.millis(100)).every(Duration.millis(50)));
-
- tester.injectElements(
- 1, // [-50, 50), [0, 100)
- 50); // [0, 100), [50, 150)
-
- IntervalWindow firstWindow = new IntervalWindow(new Instant(-50), new Instant(50));
- IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new Instant(100));
- IntervalWindow thirdWindow = new IntervalWindow(new Instant(50), new Instant(150));
-
- assertFalse(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- assertFalse(tester.shouldFire(thirdWindow));
-
- // At 50, the first becomes ready; it stays ready after firing
- tester.advanceInputWatermark(new Instant(50));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- assertFalse(tester.shouldFire(thirdWindow));
- tester.fireIfShouldFire(firstWindow);
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- assertFalse(tester.shouldFire(thirdWindow));
-
- // At 99, the first is still the only one ready
- tester.advanceInputWatermark(new Instant(99));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- assertFalse(tester.shouldFire(thirdWindow));
-
- // At 100, the first and second are ready
- tester.advanceInputWatermark(new Instant(100));
- assertTrue(tester.shouldFire(firstWindow));
- assertTrue(tester.shouldFire(secondWindow));
- assertFalse(tester.shouldFire(thirdWindow));
- tester.fireIfShouldFire(firstWindow);
-
- assertFalse(tester.isMarkedFinished(firstWindow));
- assertFalse(tester.isMarkedFinished(secondWindow));
- assertFalse(tester.isMarkedFinished(thirdWindow));
- }
-
- @Test
- public void testDefaultTriggerSessions() throws Exception {
- tester = TriggerTester.forTrigger(
- DefaultTrigger.of(),
- Sessions.withGapDuration(Duration.millis(100)));
-
- tester.injectElements(
- 1, // [1, 101)
- 50); // [50, 150)
- tester.mergeWindows();
-
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(101));
- IntervalWindow secondWindow = new IntervalWindow(new Instant(50), new Instant(150));
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(150));
-
- // Not ready in any window yet
- tester.advanceInputWatermark(new Instant(100));
- assertFalse(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- assertFalse(tester.shouldFire(mergedWindow));
-
- // The first window is "ready": the caller owns knowledge of which windows are merged away
- tester.advanceInputWatermark(new Instant(149));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- assertFalse(tester.shouldFire(mergedWindow));
-
- // Now ready on all windows
- tester.advanceInputWatermark(new Instant(150));
- assertTrue(tester.shouldFire(firstWindow));
- assertTrue(tester.shouldFire(secondWindow));
- assertTrue(tester.shouldFire(mergedWindow));
-
- // Ensure it repeats
- tester.fireIfShouldFire(mergedWindow);
- assertTrue(tester.shouldFire(mergedWindow));
-
- assertFalse(tester.isMarkedFinished(mergedWindow));
- }
-
@Test
public void testFireDeadline() throws Exception {
assertEquals(new Instant(9), DefaultTrigger.of().getWatermarkThatGuaranteesFiring(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
index fb2b4d5..1052873 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
@@ -17,40 +17,26 @@
*/
package org.apache.beam.sdk.transforms.windowing;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertEquals;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
import org.joda.time.Instant;
-import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-/**
- * Tests for {@link Never}.
- */
+/** Tests for {@link Never}. */
@RunWith(JUnit4.class)
public class NeverTest {
- private SimpleTriggerTester<IntervalWindow> triggerTester;
-
- @Before
- public void setup() throws Exception {
- triggerTester =
- TriggerTester.forTrigger(
- Never.ever(), FixedWindows.of(Duration.standardMinutes(5)));
+ @Test
+ public void testFireDeadline() throws Exception {
+ assertEquals(
+ BoundedWindow.TIMESTAMP_MAX_VALUE,
+ Never.ever()
+ .getWatermarkThatGuaranteesFiring(new IntervalWindow(new Instant(0), new Instant(10))));
}
@Test
- public void falseAfterEndOfWindow() throws Exception {
- triggerTester.injectElements(TimestampedValue.of(1, new Instant(1)));
- IntervalWindow window =
- new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardMinutes(5)));
- assertThat(triggerTester.shouldFire(window), is(false));
- triggerTester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
- assertThat(triggerTester.shouldFire(window), is(false));
+ public void testContinuation() throws Exception {
+ assertEquals(Never.ever(), Never.ever().getContinuationTrigger());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java
index 7289d97..6e61e10 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java
@@ -18,13 +18,8 @@
package org.apache.beam.sdk.transforms.windowing;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -36,137 +31,6 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class OrFinallyTriggerTest {
- private SimpleTriggerTester<IntervalWindow> tester;
-
- /**
- * Tests that for {@code OrFinally(actual, ...)} when {@code actual}
- * fires and finishes, the {@code OrFinally} also fires and finishes.
- */
- @Test
- public void testActualFiresAndFinishes() throws Exception {
- tester = TriggerTester.forTrigger(
- new OrFinallyTrigger(
- AfterPane.elementCountAtLeast(2),
- AfterPane.elementCountAtLeast(100)),
- FixedWindows.of(Duration.millis(100)));
-
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
-
- // Not yet firing
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
- assertFalse(tester.isMarkedFinished(window));
-
- // The actual fires and finishes
- tester.injectElements(2);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertTrue(tester.isMarkedFinished(window));
- }
-
- /**
- * Tests that for {@code OrFinally(actual, ...)} when {@code actual}
- * fires but does not finish, the {@code OrFinally} also fires and also does not
- * finish.
- */
- @Test
- public void testActualFiresOnly() throws Exception {
- tester = TriggerTester.forTrigger(
- new OrFinallyTrigger(
- Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
- AfterPane.elementCountAtLeast(100)),
- FixedWindows.of(Duration.millis(100)));
-
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
-
- // Not yet firing
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
- assertFalse(tester.isMarkedFinished(window));
-
- // The actual fires but does not finish
- tester.injectElements(2);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
-
- // And again
- tester.injectElements(3, 4);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
- }
-
- /**
- * Tests that if the first trigger rewinds to be non-finished in the merged window,
- * then it becomes the currently active trigger again, with real triggers.
- */
- @Test
- public void testShouldFireAfterMerge() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterEach.inOrder(
- AfterPane.elementCountAtLeast(5)
- .orFinally(AfterWatermark.pastEndOfWindow()),
- Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
- Sessions.withGapDuration(Duration.millis(10)));
-
- // Finished the orFinally in the first window
- tester.injectElements(1);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- assertFalse(tester.shouldFire(firstWindow));
- tester.advanceInputWatermark(new Instant(11));
- assertTrue(tester.shouldFire(firstWindow));
- tester.fireIfShouldFire(firstWindow);
-
- // Set up second window where it is not done
- tester.injectElements(5);
- IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Merge them, if the merged window were on the second trigger, it would be ready
- tester.mergeWindows();
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
- assertFalse(tester.shouldFire(mergedWindow));
-
- // Now adding 3 more makes the main trigger ready to fire
- tester.injectElements(1, 2, 3, 4, 5);
- tester.mergeWindows();
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
- /**
- * Tests that for {@code OrFinally(actual, until)} when {@code actual}
- * fires but does not finish, then {@code until} fires and finishes, the
- * whole thing fires and finished.
- */
- @Test
- public void testActualFiresButUntilFinishes() throws Exception {
- tester = TriggerTester.forTrigger(
- new OrFinallyTrigger(
- Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
- AfterPane.elementCountAtLeast(3)),
- FixedWindows.of(Duration.millis(10)));
-
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
-
- // Before any firing
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
- assertFalse(tester.isMarkedFinished(window));
-
- // The actual fires but doesn't finish
- tester.injectElements(2);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
-
- // The until fires and finishes; the trigger is finished
- tester.injectElements(3);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertTrue(tester.isMarkedFinished(window));
- }
-
@Test
public void testFireDeadline() throws Exception {
BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java
index 6e8930d..55cb77e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java
@@ -19,14 +19,9 @@ package org.apache.beam.sdk.transforms.windowing;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
@@ -36,51 +31,17 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
-/**
- * Tests for {@link Repeatedly}.
- */
+/** Tests for {@link Repeatedly}. */
@RunWith(JUnit4.class)
public class RepeatedlyTest {
@Mock private Trigger mockTrigger;
- private SimpleTriggerTester<IntervalWindow> tester;
- private static Trigger.TriggerContext anyTriggerContext() {
- return Mockito.<Trigger.TriggerContext>any();
- }
public void setUp(WindowFn<Object, IntervalWindow> windowFn) throws Exception {
MockitoAnnotations.initMocks(this);
- tester = TriggerTester.forTrigger(Repeatedly.forever(mockTrigger), windowFn);
- }
-
- /**
- * Tests that onElement correctly passes the data on to the subtrigger.
- */
- @Test
- public void testOnElement() throws Exception {
- setUp(FixedWindows.of(Duration.millis(10)));
- tester.injectElements(37);
- verify(mockTrigger).onElement(Mockito.<Trigger.OnElementContext>any());
- }
-
- /**
- * Tests that the repeatedly is ready to fire whenever the subtrigger is ready.
- */
- @Test
- public void testShouldFire() throws Exception {
- setUp(FixedWindows.of(Duration.millis(10)));
-
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- assertTrue(tester.shouldFire(new IntervalWindow(new Instant(0), new Instant(10))));
-
- when(mockTrigger.shouldFire(Mockito.<Trigger.TriggerContext>any()))
- .thenReturn(false);
- assertFalse(tester.shouldFire(new IntervalWindow(new Instant(0), new Instant(10))));
}
- /**
- * Tests that the watermark that guarantees firing is that of the subtrigger.
- */
+ /** Tests that the watermark that guarantees firing is that of the subtrigger. */
@Test
public void testFireDeadline() throws Exception {
setUp(FixedWindows.of(Duration.millis(10)));
@@ -107,118 +68,16 @@ public class RepeatedlyTest {
}
@Test
- public void testShouldFireAfterMerge() throws Exception {
- tester = TriggerTester.forTrigger(
- Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
- Sessions.withGapDuration(Duration.millis(10)));
-
- tester.injectElements(1);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- assertFalse(tester.shouldFire(firstWindow));
-
- tester.injectElements(5);
- IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Merge them, if the merged window were on the second trigger, it would be ready
- tester.mergeWindows();
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
- @Test
- public void testRepeatedlyAfterFirstElementCount() throws Exception {
- SimpleTriggerTester<GlobalWindow> tester =
- TriggerTester.forTrigger(
- Repeatedly.forever(
- AfterFirst.of(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(Duration.standardMinutes(15)),
- AfterPane.elementCountAtLeast(5))),
- new GlobalWindows());
-
- GlobalWindow window = GlobalWindow.INSTANCE;
-
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
-
- tester.injectElements(2, 3, 4, 5);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.shouldFire(window));
- }
-
- @Test
- public void testRepeatedlyAfterFirstProcessingTime() throws Exception {
- SimpleTriggerTester<GlobalWindow> tester =
- TriggerTester.forTrigger(
- Repeatedly.forever(
- AfterFirst.of(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(Duration.standardMinutes(15)),
- AfterPane.elementCountAtLeast(5))),
- new GlobalWindows());
-
- GlobalWindow window = GlobalWindow.INSTANCE;
-
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
-
- tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15)));
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.shouldFire(window));
- }
-
- @Test
- public void testRepeatedlyElementCount() throws Exception {
- SimpleTriggerTester<GlobalWindow> tester =
- TriggerTester.forTrigger(
- Repeatedly.forever(AfterPane.elementCountAtLeast(5)),
- new GlobalWindows());
-
- GlobalWindow window = GlobalWindow.INSTANCE;
-
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
-
- tester.injectElements(2, 3, 4, 5);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.shouldFire(window));
- }
-
- @Test
- public void testRepeatedlyProcessingTime() throws Exception {
- SimpleTriggerTester<GlobalWindow> tester =
- TriggerTester.forTrigger(
- Repeatedly.forever(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(Duration.standardMinutes(15))),
- new GlobalWindows());
-
- GlobalWindow window = GlobalWindow.INSTANCE;
-
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
-
- tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15)));
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.shouldFire(window));
- }
-
-
- @Test
public void testToString() {
- Trigger trigger = Repeatedly.forever(new StubTrigger() {
- @Override
- public String toString() {
- return "innerTrigger";
- }
- });
+ Trigger trigger =
+ Repeatedly.forever(
+ new StubTrigger() {
+ @Override
+ public String toString() {
+ return "innerTrigger";
+ }
+ });
assertEquals("Repeatedly.forever(innerTrigger)", trigger.toString());
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
index b258a79..0fc74e7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
@@ -42,23 +42,6 @@ abstract class StubTrigger extends Trigger.OnceTrigger {
}
@Override
- protected void onOnlyFiring(TriggerContext context) throws Exception {
- }
-
- @Override
- public void onElement(OnElementContext c) throws Exception {
- }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception {
- }
-
- @Override
- public boolean shouldFire(TriggerContext context) throws Exception {
- return false;
- }
-
- @Override
protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java
index cfc03b2..2602f79 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java
@@ -59,12 +59,6 @@ public class TriggerTest {
}
@Override
- public void onElement(Trigger.OnElementContext c) { }
-
- @Override
- public void onMerge(Trigger.OnMergeContext c) { }
-
- @Override
protected Trigger getContinuationTrigger(
List<Trigger> continuationTriggers) {
return null;
@@ -74,14 +68,6 @@ public class TriggerTest {
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
return null;
}
-
- @Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- return false;
- }
-
- @Override
- public void onFire(Trigger.TriggerContext context) throws Exception { }
}
private static class Trigger2 extends Trigger {
@@ -91,12 +77,6 @@ public class TriggerTest {
}
@Override
- public void onElement(Trigger.OnElementContext c) { }
-
- @Override
- public void onMerge(Trigger.OnMergeContext c) { }
-
- @Override
protected Trigger getContinuationTrigger(
List<Trigger> continuationTriggers) {
return null;
@@ -106,13 +86,5 @@ public class TriggerTest {
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
return null;
}
-
- @Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- return false;
- }
-
- @Override
- public void onFire(Trigger.TriggerContext context) throws Exception { }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java
index 1e3a1ff..befc07e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java
@@ -92,16 +92,6 @@ public class ExecutableTriggerTest {
}
@Override
- public void onElement(OnElementContext c) throws Exception { }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception { }
-
- @Override
- public void clear(TriggerContext c) throws Exception {
- }
-
- @Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
return BoundedWindow.TIMESTAMP_MAX_VALUE;
}
@@ -115,13 +105,5 @@ public class ExecutableTriggerTest {
public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
return this;
}
-
- @Override
- public boolean shouldFire(TriggerContext c) {
- return false;
- }
-
- @Override
- public void onFire(TriggerContext c) { }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersBitSetTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersBitSetTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersBitSetTest.java
deleted file mode 100644
index 7f74620..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersBitSetTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.theInstance;
-import static org.junit.Assert.assertThat;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link FinishedTriggersBitSet}.
- */
-@RunWith(JUnit4.class)
-public class FinishedTriggersBitSetTest {
- /**
- * Tests that after a trigger is set to finished, it reads back as finished.
- */
- @Test
- public void testSetGet() {
- FinishedTriggersProperties.verifyGetAfterSet(FinishedTriggersBitSet.emptyWithCapacity(1));
- }
-
- /**
- * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no
- * others.
- */
- @Test
- public void testClearRecursively() {
- FinishedTriggersProperties.verifyClearRecursively(FinishedTriggersBitSet.emptyWithCapacity(1));
- }
-
- @Test
- public void testCopy() throws Exception {
- FinishedTriggersBitSet finishedSet = FinishedTriggersBitSet.emptyWithCapacity(10);
- assertThat(finishedSet.copy().getBitSet(), not(theInstance(finishedSet.getBitSet())));
- }
-}