You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/13 22:22:42 UTC
[08/17] incubator-beam git commit: Move triggers to runners-core
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java
deleted file mode 100644
index 7e6e938..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests the {@link AfterSynchronizedProcessingTime}.
- */
-@RunWith(JUnit4.class)
-public class AfterSynchronizedProcessingTimeTest {
-
- private Trigger underTest = new AfterSynchronizedProcessingTime();
-
- @Test
- public void testAfterProcessingTimeWithFixedWindows() throws Exception {
- Duration windowDuration = Duration.millis(10);
- SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
- AfterProcessingTime
- .pastFirstElementInPane()
- .plusDelayOf(Duration.millis(5)),
- FixedWindows.of(windowDuration));
-
- tester.advanceProcessingTime(new Instant(10));
-
- // Timer at 15
- tester.injectElements(1);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(10));
- tester.advanceProcessingTime(new Instant(12));
- assertFalse(tester.shouldFire(firstWindow));
-
- // Load up elements in the next window, timer at 17 for them
- tester.injectElements(11, 12, 13);
- IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(20));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Not quite time to fire
- tester.advanceProcessingTime(new Instant(14));
- assertFalse(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Timer at 19 for these in the first window; it should be ignored since the 15 will fire first
- tester.injectElements(2, 3);
-
- // Advance past the first timer and fire, finishing the first window
- tester.advanceProcessingTime(new Instant(16));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
- assertTrue(tester.isMarkedFinished(firstWindow));
-
- // The next window fires and finishes now
- tester.advanceProcessingTime(new Instant(18));
- assertTrue(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(secondWindow);
- assertTrue(tester.isMarkedFinished(secondWindow));
- }
-
- @Test
- public void testAfterProcessingTimeWithMergingWindow() throws Exception {
- Duration windowDuration = Duration.millis(10);
- SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
- AfterProcessingTime
- .pastFirstElementInPane()
- .plusDelayOf(Duration.millis(5)),
- Sessions.withGapDuration(windowDuration));
-
- tester.advanceProcessingTime(new Instant(10));
- tester.injectElements(1); // in [1, 11), timer for 15
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- assertFalse(tester.shouldFire(firstWindow));
-
- tester.advanceProcessingTime(new Instant(12));
- tester.injectElements(3); // in [3, 13), timer for 17
- IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new Instant(13));
- assertFalse(tester.shouldFire(secondWindow));
-
- tester.mergeWindows();
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(13));
-
- tester.advanceProcessingTime(new Instant(16));
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
- @Test
- public void testFireDeadline() throws Exception {
- assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
- underTest.getWatermarkThatGuaranteesFiring(
- new IntervalWindow(new Instant(0), new Instant(10))));
- }
-
- @Test
- public void testContinuation() throws Exception {
- assertEquals(underTest, underTest.getContinuationTrigger());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java
deleted file mode 100644
index 084027b..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests the {@link AfterWatermark} triggers.
- */
-@RunWith(JUnit4.class)
-public class AfterWatermarkTest {
-
- @Mock private OnceTrigger mockEarly;
- @Mock private OnceTrigger mockLate;
-
- private SimpleTriggerTester<IntervalWindow> tester;
- private static Trigger.TriggerContext anyTriggerContext() {
- return Mockito.<Trigger.TriggerContext>any();
- }
- private static Trigger.OnElementContext anyElementContext() {
- return Mockito.<Trigger.OnElementContext>any();
- }
-
- private void injectElements(int... elements) throws Exception {
- for (int element : elements) {
- doNothing().when(mockEarly).onElement(anyElementContext());
- doNothing().when(mockLate).onElement(anyElementContext());
- tester.injectElements(element);
- }
- }
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
- }
-
- public void testRunningAsTrigger(OnceTrigger mockTrigger, IntervalWindow window)
- throws Exception {
-
- // Don't fire due to mock saying no
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
- assertFalse(tester.shouldFire(window)); // not ready
-
- // Fire due to mock trigger; early trigger is required to be a OnceTrigger
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- assertTrue(tester.shouldFire(window)); // ready
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
- }
-
- @Test
- public void testEarlyAndAtWatermark() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterWatermark.pastEndOfWindow()
- .withEarlyFirings(mockEarly),
- FixedWindows.of(Duration.millis(100)));
-
- injectElements(1);
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
-
- testRunningAsTrigger(mockEarly, window);
-
- // Fire due to watermark
- when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
- tester.advanceInputWatermark(new Instant(100));
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertTrue(tester.isMarkedFinished(window));
- }
-
- @Test
- public void testAtWatermarkAndLate() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterWatermark.pastEndOfWindow()
- .withLateFirings(mockLate),
- FixedWindows.of(Duration.millis(100)));
-
- injectElements(1);
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
-
- // No early firing, just double checking
- when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(true);
- assertFalse(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
-
- // Fire due to watermark
- when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
- tester.advanceInputWatermark(new Instant(100));
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
-
- testRunningAsTrigger(mockLate, window);
- }
-
- @Test
- public void testEarlyAndAtWatermarkAndLate() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterWatermark.pastEndOfWindow()
- .withEarlyFirings(mockEarly)
- .withLateFirings(mockLate),
- FixedWindows.of(Duration.millis(100)));
-
- injectElements(1);
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
-
- testRunningAsTrigger(mockEarly, window);
-
- // Fire due to watermark
- when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false);
- tester.advanceInputWatermark(new Instant(100));
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
-
- testRunningAsTrigger(mockLate, window);
- }
-
- /**
- * Tests that if the EOW is finished in both as well as the merged window, then
- * it is finished in the merged result.
- *
- * <p>Because windows are discarded when a trigger finishes, we need to embed this
- * in a sequence in order to check that it is re-activated. So this test is potentially
- * sensitive to other triggers' correctness.
- */
- @Test
- public void testOnMergeAlreadyFinished() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterEach.inOrder(
- AfterWatermark.pastEndOfWindow(),
- Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
- Sessions.withGapDuration(Duration.millis(10)));
-
- tester.injectElements(1);
- tester.injectElements(5);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
-
- // Finish the AfterWatermark.pastEndOfWindow() trigger in both windows
- tester.advanceInputWatermark(new Instant(15));
- assertTrue(tester.shouldFire(firstWindow));
- assertTrue(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
- tester.fireIfShouldFire(secondWindow);
-
- // Confirm that we are on the second trigger by probing
- assertFalse(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- tester.injectElements(1);
- tester.injectElements(5);
- assertTrue(tester.shouldFire(firstWindow));
- assertTrue(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
- tester.fireIfShouldFire(secondWindow);
-
- // Merging should leave it finished
- tester.mergeWindows();
-
- // Confirm that we are on the second trigger by probing
- assertFalse(tester.shouldFire(mergedWindow));
- tester.injectElements(1);
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
- /**
- * Tests that the trigger rewinds to be non-finished in the merged window.
- *
- * <p>Because windows are discarded when a trigger finishes, we need to embed this
- * in a sequence in order to check that it is re-activated. So this test is potentially
- * sensitive to other triggers' correctness.
- */
- @Test
- public void testOnMergeRewinds() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterEach.inOrder(
- AfterWatermark.pastEndOfWindow(),
- Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
- Sessions.withGapDuration(Duration.millis(10)));
-
- tester.injectElements(1);
- tester.injectElements(5);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
-
- // Finish the AfterWatermark.pastEndOfWindow() trigger in only the first window
- tester.advanceInputWatermark(new Instant(11));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
-
- // Confirm that we are on the second trigger by probing
- assertFalse(tester.shouldFire(firstWindow));
- tester.injectElements(1);
- assertTrue(tester.shouldFire(firstWindow));
- tester.fireIfShouldFire(firstWindow);
-
- // Merging should re-activate the watermark trigger in the merged window
- tester.mergeWindows();
-
- // Confirm that we are not on the second trigger by probing
- assertFalse(tester.shouldFire(mergedWindow));
- tester.injectElements(1);
- assertFalse(tester.shouldFire(mergedWindow));
-
- // And confirm that advancing the watermark fires again
- tester.advanceInputWatermark(new Instant(15));
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
- /**
- * Tests that if the EOW is finished in both as well as the merged window, then
- * it is finished in the merged result.
- *
- * <p>Because windows are discarded when a trigger finishes, we need to embed this
- * in a sequence in order to check that it is re-activated. So this test is potentially
- * sensitive to other triggers' correctness.
- */
- @Test
- public void testEarlyAndLateOnMergeAlreadyFinished() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterWatermark.pastEndOfWindow()
- .withEarlyFirings(AfterPane.elementCountAtLeast(100))
- .withLateFirings(AfterPane.elementCountAtLeast(1)),
- Sessions.withGapDuration(Duration.millis(10)));
-
- tester.injectElements(1);
- tester.injectElements(5);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
-
- // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in both windows
- tester.advanceInputWatermark(new Instant(15));
- assertTrue(tester.shouldFire(firstWindow));
- assertTrue(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
- tester.fireIfShouldFire(secondWindow);
-
- // Confirm that we are on the late trigger by probing
- assertFalse(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- tester.injectElements(1);
- tester.injectElements(5);
- assertTrue(tester.shouldFire(firstWindow));
- assertTrue(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
- tester.fireIfShouldFire(secondWindow);
-
- // Merging should leave it on the late trigger
- tester.mergeWindows();
-
- // Confirm that we are on the late trigger by probing
- assertFalse(tester.shouldFire(mergedWindow));
- tester.injectElements(1);
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
- /**
- * Tests that the trigger rewinds to be non-finished in the merged window.
- *
- * <p>Because windows are discarded when a trigger finishes, we need to embed this
- * in a sequence in order to check that it is re-activated. So this test is potentially
- * sensitive to other triggers' correctness.
- */
- @Test
- public void testEarlyAndLateOnMergeRewinds() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterWatermark.pastEndOfWindow()
- .withEarlyFirings(AfterPane.elementCountAtLeast(100))
- .withLateFirings(AfterPane.elementCountAtLeast(1)),
- Sessions.withGapDuration(Duration.millis(10)));
-
- tester.injectElements(1);
- tester.injectElements(5);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
-
- // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in only the first window
- tester.advanceInputWatermark(new Instant(11));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- tester.fireIfShouldFire(firstWindow);
-
- // Confirm that we are on the late trigger by probing
- assertFalse(tester.shouldFire(firstWindow));
- tester.injectElements(1);
- assertTrue(tester.shouldFire(firstWindow));
- tester.fireIfShouldFire(firstWindow);
-
- // Merging should re-activate the early trigger in the merged window
- tester.mergeWindows();
-
- // Confirm that we are not on the second trigger by probing
- assertFalse(tester.shouldFire(mergedWindow));
- tester.injectElements(1);
- assertFalse(tester.shouldFire(mergedWindow));
-
- // And confirm that advancing the watermark fires again
- tester.advanceInputWatermark(new Instant(15));
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
- @Test
- public void testFromEndOfWindowToString() {
- Trigger trigger = AfterWatermark.pastEndOfWindow();
- assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString());
- }
-
- @Test
- public void testEarlyFiringsToString() {
- Trigger trigger = AfterWatermark.pastEndOfWindow().withEarlyFirings(StubTrigger.named("t1"));
-
- assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1)", trigger.toString());
- }
-
- @Test
- public void testLateFiringsToString() {
- Trigger trigger = AfterWatermark.pastEndOfWindow().withLateFirings(StubTrigger.named("t1"));
-
- assertEquals("AfterWatermark.pastEndOfWindow().withLateFirings(t1)", trigger.toString());
- }
-
- @Test
- public void testEarlyAndLateFiringsToString() {
- Trigger trigger =
- AfterWatermark.pastEndOfWindow()
- .withEarlyFirings(StubTrigger.named("t1"))
- .withLateFirings(StubTrigger.named("t2"));
-
- assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1).withLateFirings(t2)",
- trigger.toString());
- }
-
- @Test
- public void testToStringExcludesNeverTrigger() {
- Trigger trigger =
- AfterWatermark.pastEndOfWindow()
- .withEarlyFirings(Never.ever())
- .withLateFirings(Never.ever());
-
- assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java
deleted file mode 100644
index 673e555..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests the {@link DefaultTrigger}, which should be equivalent to
- * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
- */
-@RunWith(JUnit4.class)
-public class DefaultTriggerTest {
-
- SimpleTriggerTester<IntervalWindow> tester;
-
- @Test
- public void testDefaultTriggerFixedWindows() throws Exception {
- tester = TriggerTester.forTrigger(
- DefaultTrigger.of(),
- FixedWindows.of(Duration.millis(100)));
-
- tester.injectElements(
- 1, // [0, 100)
- 101); // [100, 200)
-
- IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(100));
- IntervalWindow secondWindow = new IntervalWindow(new Instant(100), new Instant(200));
-
- // Advance the watermark almost to the end of the first window.
- tester.advanceInputWatermark(new Instant(99));
- assertFalse(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Advance watermark past end of the first window, which is then ready
- tester.advanceInputWatermark(new Instant(100));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Fire, but the first window is still allowed to fire
- tester.fireIfShouldFire(firstWindow);
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Advance watermark to 200, then both are ready
- tester.advanceInputWatermark(new Instant(200));
- assertTrue(tester.shouldFire(firstWindow));
- assertTrue(tester.shouldFire(secondWindow));
-
- assertFalse(tester.isMarkedFinished(firstWindow));
- assertFalse(tester.isMarkedFinished(secondWindow));
- }
-
- @Test
- public void testDefaultTriggerSlidingWindows() throws Exception {
- tester = TriggerTester.forTrigger(
- DefaultTrigger.of(),
- SlidingWindows.of(Duration.millis(100)).every(Duration.millis(50)));
-
- tester.injectElements(
- 1, // [-50, 50), [0, 100)
- 50); // [0, 100), [50, 150)
-
- IntervalWindow firstWindow = new IntervalWindow(new Instant(-50), new Instant(50));
- IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new Instant(100));
- IntervalWindow thirdWindow = new IntervalWindow(new Instant(50), new Instant(150));
-
- assertFalse(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- assertFalse(tester.shouldFire(thirdWindow));
-
- // At 50, the first becomes ready; it stays ready after firing
- tester.advanceInputWatermark(new Instant(50));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- assertFalse(tester.shouldFire(thirdWindow));
- tester.fireIfShouldFire(firstWindow);
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- assertFalse(tester.shouldFire(thirdWindow));
-
- // At 99, the first is still the only one ready
- tester.advanceInputWatermark(new Instant(99));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- assertFalse(tester.shouldFire(thirdWindow));
-
- // At 100, the first and second are ready
- tester.advanceInputWatermark(new Instant(100));
- assertTrue(tester.shouldFire(firstWindow));
- assertTrue(tester.shouldFire(secondWindow));
- assertFalse(tester.shouldFire(thirdWindow));
- tester.fireIfShouldFire(firstWindow);
-
- assertFalse(tester.isMarkedFinished(firstWindow));
- assertFalse(tester.isMarkedFinished(secondWindow));
- assertFalse(tester.isMarkedFinished(thirdWindow));
- }
-
- @Test
- public void testDefaultTriggerSessions() throws Exception {
- tester = TriggerTester.forTrigger(
- DefaultTrigger.of(),
- Sessions.withGapDuration(Duration.millis(100)));
-
- tester.injectElements(
- 1, // [1, 101)
- 50); // [50, 150)
- tester.mergeWindows();
-
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(101));
- IntervalWindow secondWindow = new IntervalWindow(new Instant(50), new Instant(150));
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(150));
-
- // Not ready in any window yet
- tester.advanceInputWatermark(new Instant(100));
- assertFalse(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- assertFalse(tester.shouldFire(mergedWindow));
-
- // The first window is "ready": the caller owns knowledge of which windows are merged away
- tester.advanceInputWatermark(new Instant(149));
- assertTrue(tester.shouldFire(firstWindow));
- assertFalse(tester.shouldFire(secondWindow));
- assertFalse(tester.shouldFire(mergedWindow));
-
- // Now ready on all windows
- tester.advanceInputWatermark(new Instant(150));
- assertTrue(tester.shouldFire(firstWindow));
- assertTrue(tester.shouldFire(secondWindow));
- assertTrue(tester.shouldFire(mergedWindow));
-
- // Ensure it repeats
- tester.fireIfShouldFire(mergedWindow);
- assertTrue(tester.shouldFire(mergedWindow));
-
- assertFalse(tester.isMarkedFinished(mergedWindow));
- }
-
- @Test
- public void testFireDeadline() throws Exception {
- assertEquals(new Instant(9), DefaultTrigger.of().getWatermarkThatGuaranteesFiring(
- new IntervalWindow(new Instant(0), new Instant(10))));
- assertEquals(GlobalWindow.INSTANCE.maxTimestamp(),
- DefaultTrigger.of().getWatermarkThatGuaranteesFiring(GlobalWindow.INSTANCE));
- }
-
- @Test
- public void testContinuation() throws Exception {
- assertEquals(DefaultTrigger.of(), DefaultTrigger.of().getContinuationTrigger());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
deleted file mode 100644
index fb2b4d5..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link Never}.
- */
-@RunWith(JUnit4.class)
-public class NeverTest {
- private SimpleTriggerTester<IntervalWindow> triggerTester;
-
- @Before
- public void setup() throws Exception {
- triggerTester =
- TriggerTester.forTrigger(
- Never.ever(), FixedWindows.of(Duration.standardMinutes(5)));
- }
-
- @Test
- public void falseAfterEndOfWindow() throws Exception {
- triggerTester.injectElements(TimestampedValue.of(1, new Instant(1)));
- IntervalWindow window =
- new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardMinutes(5)));
- assertThat(triggerTester.shouldFire(window), is(false));
- triggerTester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
- assertThat(triggerTester.shouldFire(window), is(false));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java
deleted file mode 100644
index 7289d97..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link OrFinallyTrigger}.
- */
-@RunWith(JUnit4.class)
-public class OrFinallyTriggerTest {
-
- private SimpleTriggerTester<IntervalWindow> tester;
-
- /**
- * Tests that for {@code OrFinally(actual, ...)} when {@code actual}
- * fires and finishes, the {@code OrFinally} also fires and finishes.
- */
- @Test
- public void testActualFiresAndFinishes() throws Exception {
- tester = TriggerTester.forTrigger(
- new OrFinallyTrigger(
- AfterPane.elementCountAtLeast(2),
- AfterPane.elementCountAtLeast(100)),
- FixedWindows.of(Duration.millis(100)));
-
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
-
- // Not yet firing
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
- assertFalse(tester.isMarkedFinished(window));
-
- // The actual fires and finishes
- tester.injectElements(2);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertTrue(tester.isMarkedFinished(window));
- }
-
- /**
- * Tests that for {@code OrFinally(actual, ...)} when {@code actual}
- * fires but does not finish, the {@code OrFinally} also fires and also does not
- * finish.
- */
- @Test
- public void testActualFiresOnly() throws Exception {
- tester = TriggerTester.forTrigger(
- new OrFinallyTrigger(
- Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
- AfterPane.elementCountAtLeast(100)),
- FixedWindows.of(Duration.millis(100)));
-
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
-
- // Not yet firing
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
- assertFalse(tester.isMarkedFinished(window));
-
- // The actual fires but does not finish
- tester.injectElements(2);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
-
- // And again
- tester.injectElements(3, 4);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
- }
-
- /**
- * Tests that if the first trigger rewinds to be non-finished in the merged window,
- * then it becomes the currently active trigger again, with real triggers.
- */
- @Test
- public void testShouldFireAfterMerge() throws Exception {
- tester = TriggerTester.forTrigger(
- AfterEach.inOrder(
- AfterPane.elementCountAtLeast(5)
- .orFinally(AfterWatermark.pastEndOfWindow()),
- Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
- Sessions.withGapDuration(Duration.millis(10)));
-
- // Finished the orFinally in the first window
- tester.injectElements(1);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- assertFalse(tester.shouldFire(firstWindow));
- tester.advanceInputWatermark(new Instant(11));
- assertTrue(tester.shouldFire(firstWindow));
- tester.fireIfShouldFire(firstWindow);
-
- // Set up second window where it is not done
- tester.injectElements(5);
- IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Merge them, if the merged window were on the second trigger, it would be ready
- tester.mergeWindows();
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
- assertFalse(tester.shouldFire(mergedWindow));
-
- // Now adding 3 more makes the main trigger ready to fire
- tester.injectElements(1, 2, 3, 4, 5);
- tester.mergeWindows();
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
- /**
- * Tests that for {@code OrFinally(actual, until)} when {@code actual}
- * fires but does not finish, then {@code until} fires and finishes, the
- * whole thing fires and finished.
- */
- @Test
- public void testActualFiresButUntilFinishes() throws Exception {
- tester = TriggerTester.forTrigger(
- new OrFinallyTrigger(
- Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
- AfterPane.elementCountAtLeast(3)),
- FixedWindows.of(Duration.millis(10)));
-
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
-
- // Before any firing
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
- assertFalse(tester.isMarkedFinished(window));
-
- // The actual fires but doesn't finish
- tester.injectElements(2);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.isMarkedFinished(window));
-
- // The until fires and finishes; the trigger is finished
- tester.injectElements(3);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertTrue(tester.isMarkedFinished(window));
- }
-
- @Test
- public void testFireDeadline() throws Exception {
- BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10));
-
- assertEquals(new Instant(9),
- Repeatedly.forever(AfterWatermark.pastEndOfWindow())
- .getWatermarkThatGuaranteesFiring(window));
- assertEquals(new Instant(9), Repeatedly.forever(AfterWatermark.pastEndOfWindow())
- .orFinally(AfterPane.elementCountAtLeast(1))
- .getWatermarkThatGuaranteesFiring(window));
- assertEquals(new Instant(9), Repeatedly.forever(AfterPane.elementCountAtLeast(1))
- .orFinally(AfterWatermark.pastEndOfWindow())
- .getWatermarkThatGuaranteesFiring(window));
- assertEquals(new Instant(9),
- AfterPane.elementCountAtLeast(100)
- .orFinally(AfterWatermark.pastEndOfWindow())
- .getWatermarkThatGuaranteesFiring(window));
-
- assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
- Repeatedly.forever(AfterPane.elementCountAtLeast(1))
- .orFinally(AfterPane.elementCountAtLeast(10))
- .getWatermarkThatGuaranteesFiring(window));
- }
-
- @Test
- public void testContinuation() throws Exception {
- OnceTrigger triggerA = AfterProcessingTime.pastFirstElementInPane();
- OnceTrigger triggerB = AfterWatermark.pastEndOfWindow();
- Trigger aOrFinallyB = triggerA.orFinally(triggerB);
- Trigger bOrFinallyA = triggerB.orFinally(triggerA);
- assertEquals(
- Repeatedly.forever(
- triggerA.getContinuationTrigger().orFinally(triggerB.getContinuationTrigger())),
- aOrFinallyB.getContinuationTrigger());
- assertEquals(
- Repeatedly.forever(
- triggerB.getContinuationTrigger().orFinally(triggerA.getContinuationTrigger())),
- bOrFinallyA.getContinuationTrigger());
- }
-
- @Test
- public void testToString() {
- Trigger trigger = StubTrigger.named("t1").orFinally(StubTrigger.named("t2"));
- assertEquals("t1.orFinally(t2)", trigger.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java
deleted file mode 100644
index 6e8930d..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link Repeatedly}.
- */
-@RunWith(JUnit4.class)
-public class RepeatedlyTest {
-
- @Mock private Trigger mockTrigger;
- private SimpleTriggerTester<IntervalWindow> tester;
- private static Trigger.TriggerContext anyTriggerContext() {
- return Mockito.<Trigger.TriggerContext>any();
- }
-
- public void setUp(WindowFn<Object, IntervalWindow> windowFn) throws Exception {
- MockitoAnnotations.initMocks(this);
- tester = TriggerTester.forTrigger(Repeatedly.forever(mockTrigger), windowFn);
- }
-
- /**
- * Tests that onElement correctly passes the data on to the subtrigger.
- */
- @Test
- public void testOnElement() throws Exception {
- setUp(FixedWindows.of(Duration.millis(10)));
- tester.injectElements(37);
- verify(mockTrigger).onElement(Mockito.<Trigger.OnElementContext>any());
- }
-
- /**
- * Tests that the repeatedly is ready to fire whenever the subtrigger is ready.
- */
- @Test
- public void testShouldFire() throws Exception {
- setUp(FixedWindows.of(Duration.millis(10)));
-
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- assertTrue(tester.shouldFire(new IntervalWindow(new Instant(0), new Instant(10))));
-
- when(mockTrigger.shouldFire(Mockito.<Trigger.TriggerContext>any()))
- .thenReturn(false);
- assertFalse(tester.shouldFire(new IntervalWindow(new Instant(0), new Instant(10))));
- }
-
- /**
- * Tests that the watermark that guarantees firing is that of the subtrigger.
- */
- @Test
- public void testFireDeadline() throws Exception {
- setUp(FixedWindows.of(Duration.millis(10)));
- IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
- Instant arbitraryInstant = new Instant(34957849);
-
- when(mockTrigger.getWatermarkThatGuaranteesFiring(Mockito.<IntervalWindow>any()))
- .thenReturn(arbitraryInstant);
-
- assertThat(
- Repeatedly.forever(mockTrigger).getWatermarkThatGuaranteesFiring(window),
- equalTo(arbitraryInstant));
- }
-
- @Test
- public void testContinuation() throws Exception {
- Trigger trigger = AfterProcessingTime.pastFirstElementInPane();
- Trigger repeatedly = Repeatedly.forever(trigger);
- assertEquals(
- Repeatedly.forever(trigger.getContinuationTrigger()), repeatedly.getContinuationTrigger());
- assertEquals(
- Repeatedly.forever(trigger.getContinuationTrigger().getContinuationTrigger()),
- repeatedly.getContinuationTrigger().getContinuationTrigger());
- }
-
- @Test
- public void testShouldFireAfterMerge() throws Exception {
- tester = TriggerTester.forTrigger(
- Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
- Sessions.withGapDuration(Duration.millis(10)));
-
- tester.injectElements(1);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11));
- assertFalse(tester.shouldFire(firstWindow));
-
- tester.injectElements(5);
- IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15));
- assertFalse(tester.shouldFire(secondWindow));
-
- // Merge them, if the merged window were on the second trigger, it would be ready
- tester.mergeWindows();
- IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15));
- assertTrue(tester.shouldFire(mergedWindow));
- }
-
- @Test
- public void testRepeatedlyAfterFirstElementCount() throws Exception {
- SimpleTriggerTester<GlobalWindow> tester =
- TriggerTester.forTrigger(
- Repeatedly.forever(
- AfterFirst.of(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(Duration.standardMinutes(15)),
- AfterPane.elementCountAtLeast(5))),
- new GlobalWindows());
-
- GlobalWindow window = GlobalWindow.INSTANCE;
-
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
-
- tester.injectElements(2, 3, 4, 5);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.shouldFire(window));
- }
-
- @Test
- public void testRepeatedlyAfterFirstProcessingTime() throws Exception {
- SimpleTriggerTester<GlobalWindow> tester =
- TriggerTester.forTrigger(
- Repeatedly.forever(
- AfterFirst.of(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(Duration.standardMinutes(15)),
- AfterPane.elementCountAtLeast(5))),
- new GlobalWindows());
-
- GlobalWindow window = GlobalWindow.INSTANCE;
-
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
-
- tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15)));
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.shouldFire(window));
- }
-
- @Test
- public void testRepeatedlyElementCount() throws Exception {
- SimpleTriggerTester<GlobalWindow> tester =
- TriggerTester.forTrigger(
- Repeatedly.forever(AfterPane.elementCountAtLeast(5)),
- new GlobalWindows());
-
- GlobalWindow window = GlobalWindow.INSTANCE;
-
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
-
- tester.injectElements(2, 3, 4, 5);
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.shouldFire(window));
- }
-
- @Test
- public void testRepeatedlyProcessingTime() throws Exception {
- SimpleTriggerTester<GlobalWindow> tester =
- TriggerTester.forTrigger(
- Repeatedly.forever(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(Duration.standardMinutes(15))),
- new GlobalWindows());
-
- GlobalWindow window = GlobalWindow.INSTANCE;
-
- tester.injectElements(1);
- assertFalse(tester.shouldFire(window));
-
- tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15)));
- assertTrue(tester.shouldFire(window));
- tester.fireIfShouldFire(window);
- assertFalse(tester.shouldFire(window));
- }
-
-
- @Test
- public void testToString() {
- Trigger trigger = Repeatedly.forever(new StubTrigger() {
- @Override
- public String toString() {
- return "innerTrigger";
- }
- });
-
- assertEquals("Repeatedly.forever(innerTrigger)", trigger.toString());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
deleted file mode 100644
index b258a79..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import com.google.common.collect.Lists;
-import java.util.List;
-import org.joda.time.Instant;
-
-/**
- * No-op {@link OnceTrigger} implementation for testing.
- */
-abstract class StubTrigger extends Trigger.OnceTrigger {
- /**
- * Create a stub {@link Trigger} instance which returns the specified name on {@link #toString()}.
- */
- static StubTrigger named(final String name) {
- return new StubTrigger() {
- @Override
- public String toString() {
- return name;
- }
- };
- }
-
- protected StubTrigger() {
- super(Lists.<Trigger>newArrayList());
- }
-
- @Override
- protected void onOnlyFiring(TriggerContext context) throws Exception {
- }
-
- @Override
- public void onElement(OnElementContext c) throws Exception {
- }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception {
- }
-
- @Override
- public boolean shouldFire(TriggerContext context) throws Exception {
- return false;
- }
-
- @Override
- protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
- return null;
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java
deleted file mode 100644
index cfc03b2..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.List;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link Trigger}.
- */
-@RunWith(JUnit4.class)
-public class TriggerTest {
-
- @Test
- public void testTriggerToString() throws Exception {
- assertEquals("AfterWatermark.pastEndOfWindow()", AfterWatermark.pastEndOfWindow().toString());
- assertEquals("Repeatedly.forever(AfterWatermark.pastEndOfWindow())",
- Repeatedly.forever(AfterWatermark.pastEndOfWindow()).toString());
- }
-
- @Test
- public void testIsCompatible() throws Exception {
- assertTrue(new Trigger1(null).isCompatible(new Trigger1(null)));
- assertTrue(new Trigger1(Arrays.<Trigger>asList(new Trigger2(null)))
- .isCompatible(new Trigger1(Arrays.<Trigger>asList(new Trigger2(null)))));
-
- assertFalse(new Trigger1(null).isCompatible(new Trigger2(null)));
- assertFalse(new Trigger1(Arrays.<Trigger>asList(new Trigger1(null)))
- .isCompatible(new Trigger1(Arrays.<Trigger>asList(new Trigger2(null)))));
- }
-
- private static class Trigger1 extends Trigger {
-
- private Trigger1(List<Trigger> subTriggers) {
- super(subTriggers);
- }
-
- @Override
- public void onElement(Trigger.OnElementContext c) { }
-
- @Override
- public void onMerge(Trigger.OnMergeContext c) { }
-
- @Override
- protected Trigger getContinuationTrigger(
- List<Trigger> continuationTriggers) {
- return null;
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
- return null;
- }
-
- @Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- return false;
- }
-
- @Override
- public void onFire(Trigger.TriggerContext context) throws Exception { }
- }
-
- private static class Trigger2 extends Trigger {
-
- private Trigger2(List<Trigger> subTriggers) {
- super(subTriggers);
- }
-
- @Override
- public void onElement(Trigger.OnElementContext c) { }
-
- @Override
- public void onMerge(Trigger.OnMergeContext c) { }
-
- @Override
- protected Trigger getContinuationTrigger(
- List<Trigger> continuationTriggers) {
- return null;
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
- return null;
- }
-
- @Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- return false;
- }
-
- @Override
- public void onFire(Trigger.TriggerContext context) throws Exception { }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java
deleted file mode 100644
index 1e3a1ff..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ExecutableTrigger}.
- */
-@RunWith(JUnit4.class)
-public class ExecutableTriggerTest {
-
- @Test
- public void testIndexAssignmentLeaf() throws Exception {
- StubTrigger t1 = new StubTrigger();
- ExecutableTrigger executable = ExecutableTrigger.create(t1);
- assertEquals(0, executable.getTriggerIndex());
- }
-
- @Test
- public void testIndexAssignmentOneLevel() throws Exception {
- StubTrigger t1 = new StubTrigger();
- StubTrigger t2 = new StubTrigger();
- StubTrigger t = new StubTrigger(t1, t2);
-
- ExecutableTrigger executable = ExecutableTrigger.create(t);
-
- assertEquals(0, executable.getTriggerIndex());
- assertEquals(1, executable.subTriggers().get(0).getTriggerIndex());
- assertSame(t1, executable.subTriggers().get(0).getSpec());
- assertEquals(2, executable.subTriggers().get(1).getTriggerIndex());
- assertSame(t2, executable.subTriggers().get(1).getSpec());
- }
-
- @Test
- public void testIndexAssignmentTwoLevel() throws Exception {
- StubTrigger t11 = new StubTrigger();
- StubTrigger t12 = new StubTrigger();
- StubTrigger t13 = new StubTrigger();
- StubTrigger t14 = new StubTrigger();
- StubTrigger t21 = new StubTrigger();
- StubTrigger t22 = new StubTrigger();
- StubTrigger t1 = new StubTrigger(t11, t12, t13, t14);
- StubTrigger t2 = new StubTrigger(t21, t22);
- StubTrigger t = new StubTrigger(t1, t2);
-
- ExecutableTrigger executable = ExecutableTrigger.create(t);
-
- assertEquals(0, executable.getTriggerIndex());
- assertEquals(1, executable.subTriggers().get(0).getTriggerIndex());
- assertEquals(6, executable.subTriggers().get(0).getFirstIndexAfterSubtree());
- assertEquals(6, executable.subTriggers().get(1).getTriggerIndex());
-
- assertSame(t1, executable.getSubTriggerContaining(1).getSpec());
- assertSame(t2, executable.getSubTriggerContaining(6).getSpec());
- assertSame(t1, executable.getSubTriggerContaining(2).getSpec());
- assertSame(t1, executable.getSubTriggerContaining(3).getSpec());
- assertSame(t1, executable.getSubTriggerContaining(5).getSpec());
- assertSame(t2, executable.getSubTriggerContaining(7).getSpec());
- }
-
- private static class StubTrigger extends Trigger {
-
- @SafeVarargs
- protected StubTrigger(Trigger... subTriggers) {
- super(Arrays.asList(subTriggers));
- }
-
- @Override
- public void onElement(OnElementContext c) throws Exception { }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception { }
-
- @Override
- public void clear(TriggerContext c) throws Exception {
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
- return BoundedWindow.TIMESTAMP_MAX_VALUE;
- }
-
- @Override
- public boolean isCompatible(Trigger other) {
- return false;
- }
-
- @Override
- public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
- return this;
- }
-
- @Override
- public boolean shouldFire(TriggerContext c) {
- return false;
- }
-
- @Override
- public void onFire(TriggerContext c) { }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersBitSetTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersBitSetTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersBitSetTest.java
deleted file mode 100644
index 7f74620..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersBitSetTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.theInstance;
-import static org.junit.Assert.assertThat;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link FinishedTriggersBitSet}.
- */
-@RunWith(JUnit4.class)
-public class FinishedTriggersBitSetTest {
- /**
- * Tests that after a trigger is set to finished, it reads back as finished.
- */
- @Test
- public void testSetGet() {
- FinishedTriggersProperties.verifyGetAfterSet(FinishedTriggersBitSet.emptyWithCapacity(1));
- }
-
- /**
- * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no
- * others.
- */
- @Test
- public void testClearRecursively() {
- FinishedTriggersProperties.verifyClearRecursively(FinishedTriggersBitSet.emptyWithCapacity(1));
- }
-
- @Test
- public void testCopy() throws Exception {
- FinishedTriggersBitSet finishedSet = FinishedTriggersBitSet.emptyWithCapacity(10);
- assertThat(finishedSet.copy().getBitSet(), not(theInstance(finishedSet.getBitSet())));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersProperties.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersProperties.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersProperties.java
deleted file mode 100644
index a66f74f..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersProperties.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.transforms.windowing.AfterAll;
-import org.apache.beam.sdk.transforms.windowing.AfterFirst;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-
-/**
- * Generalized tests for {@link FinishedTriggers} implementations.
- */
-public class FinishedTriggersProperties {
- /**
- * Tests that for the provided trigger and {@link FinishedTriggers}, when the trigger is set
- * finished, it is correctly reported as finished.
- */
- public static void verifyGetAfterSet(FinishedTriggers finishedSet, ExecutableTrigger trigger) {
- assertFalse(finishedSet.isFinished(trigger));
- finishedSet.setFinished(trigger, true);
- assertTrue(finishedSet.isFinished(trigger));
- }
-
- /**
- * For a few arbitrary triggers, tests that when the trigger is set finished it is correctly
- * reported as finished.
- */
- public static void verifyGetAfterSet(FinishedTriggers finishedSet) {
- ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of(
- AfterFirst.of(AfterPane.elementCountAtLeast(3), AfterWatermark.pastEndOfWindow()),
- AfterAll.of(
- AfterPane.elementCountAtLeast(10), AfterProcessingTime.pastFirstElementInPane())));
-
- verifyGetAfterSet(finishedSet, trigger);
- verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0).subTriggers().get(1));
- verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0));
- verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1));
- verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1).subTriggers().get(1));
- verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1).subTriggers().get(0));
- }
-
- /**
- * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no
- * others.
- */
- public static void verifyClearRecursively(FinishedTriggers finishedSet) {
- ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of(
- AfterFirst.of(AfterPane.elementCountAtLeast(3), AfterWatermark.pastEndOfWindow()),
- AfterAll.of(
- AfterPane.elementCountAtLeast(10), AfterProcessingTime.pastFirstElementInPane())));
-
- // Set them all finished. This method is not on a trigger as it makes no sense outside tests.
- setFinishedRecursively(finishedSet, trigger);
- assertTrue(finishedSet.isFinished(trigger));
- assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0)));
- assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(0)));
- assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(1)));
-
- // Clear just the second AfterAll
- finishedSet.clearRecursively(trigger.subTriggers().get(1));
-
- // Check that the first and all that are still finished
- assertTrue(finishedSet.isFinished(trigger));
- verifyFinishedRecursively(finishedSet, trigger.subTriggers().get(0));
- verifyUnfinishedRecursively(finishedSet, trigger.subTriggers().get(1));
- }
-
- private static void setFinishedRecursively(
- FinishedTriggers finishedSet, ExecutableTrigger trigger) {
- finishedSet.setFinished(trigger, true);
- for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
- setFinishedRecursively(finishedSet, subTrigger);
- }
- }
-
- private static void verifyFinishedRecursively(
- FinishedTriggers finishedSet, ExecutableTrigger trigger) {
- assertTrue(finishedSet.isFinished(trigger));
- for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
- verifyFinishedRecursively(finishedSet, subTrigger);
- }
- }
-
- private static void verifyUnfinishedRecursively(
- FinishedTriggers finishedSet, ExecutableTrigger trigger) {
- assertFalse(finishedSet.isFinished(trigger));
- for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
- verifyUnfinishedRecursively(finishedSet, subTrigger);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersSetTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersSetTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersSetTest.java
deleted file mode 100644
index 072d264..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersSetTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.theInstance;
-import static org.junit.Assert.assertThat;
-
-import java.util.HashSet;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link FinishedTriggersSet}.
- */
-@RunWith(JUnit4.class)
-public class FinishedTriggersSetTest {
- /**
- * Tests that after a trigger is set to finished, it reads back as finished.
- */
- @Test
- public void testSetGet() {
- FinishedTriggersProperties.verifyGetAfterSet(
- FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>()));
- }
-
- /**
- * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no
- * others.
- */
- @Test
- public void testClearRecursively() {
- FinishedTriggersProperties.verifyClearRecursively(
- FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>()));
- }
-
- @Test
- public void testCopy() throws Exception {
- FinishedTriggersSet finishedSet =
- FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>());
- assertThat(finishedSet.copy().getFinishedTriggers(),
- not(theInstance(finishedSet.getFinishedTriggers())));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java
deleted file mode 100644
index 83077f4..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ReshuffleTrigger}.
- */
-@RunWith(JUnit4.class)
-public class ReshuffleTriggerTest {
-
- /** Public so that other tests can instantiate {@link ReshuffleTrigger}. */
- public static <W extends BoundedWindow> ReshuffleTrigger<W> forTest() {
- return new ReshuffleTrigger<>();
- }
-
- @Test
- public void testShouldFire() throws Exception {
- TriggerTester<Integer, IntervalWindow> tester = TriggerTester.forTrigger(
- new ReshuffleTrigger<IntervalWindow>(), FixedWindows.of(Duration.millis(100)));
- IntervalWindow arbitraryWindow = new IntervalWindow(new Instant(300), new Instant(400));
- assertTrue(tester.shouldFire(arbitraryWindow));
- }
-
- @Test
- public void testOnTimer() throws Exception {
- TriggerTester<Integer, IntervalWindow> tester = TriggerTester.forTrigger(
- new ReshuffleTrigger<IntervalWindow>(), FixedWindows.of(Duration.millis(100)));
- IntervalWindow arbitraryWindow = new IntervalWindow(new Instant(100), new Instant(200));
- tester.fireIfShouldFire(arbitraryWindow);
- assertFalse(tester.isMarkedFinished(arbitraryWindow));
- }
-
- @Test
- public void testToString() {
- Trigger trigger = new ReshuffleTrigger<>();
- assertEquals("ReshuffleTrigger()", trigger.toString());
- }
-}