You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/02 17:43:50 UTC
[03/11] incubator-beam git commit: Put classes in runners-core
package into runners.core namespace
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java
deleted file mode 100644
index f8ad291..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.emptyIterable;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
-
-import com.google.common.collect.ImmutableList;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.hamcrest.Matchers;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link PushbackSideInputDoFnRunner}.
- */
-@RunWith(JUnit4.class)
-public class PushbackSideInputDoFnRunnerTest {
- @Mock private ReadyCheckingSideInputReader reader;
- private TestDoFnRunner<Integer, Integer> underlying;
- private PCollectionView<Integer> singletonView;
-
- @Before
- public void setup() {
- MockitoAnnotations.initMocks(this);
- TestPipeline p = TestPipeline.create();
- PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
- singletonView =
- created
- .apply(Window.into(new IdentitySideInputWindowFn()))
- .apply(Sum.integersGlobally().asSingletonView());
-
- underlying = new TestDoFnRunner<>();
- }
-
- private PushbackSideInputDoFnRunner<Integer, Integer> createRunner(
- ImmutableList<PCollectionView<?>> views) {
- PushbackSideInputDoFnRunner<Integer, Integer> runner =
- PushbackSideInputDoFnRunner.create(underlying, views, reader);
- runner.startBundle();
- return runner;
- }
-
- @Test
- public void startFinishBundleDelegates() {
- PushbackSideInputDoFnRunner runner =
- createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
-
- assertThat(underlying.started, is(true));
- assertThat(underlying.finished, is(false));
- runner.finishBundle();
- assertThat(underlying.finished, is(true));
- }
-
- @Test
- public void processElementSideInputNotReady() {
- when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
- .thenReturn(false);
-
- PushbackSideInputDoFnRunner<Integer, Integer> runner =
- createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
-
- WindowedValue<Integer> oneWindow =
- WindowedValue.of(
- 2,
- new Instant(-2),
- new IntervalWindow(new Instant(-500L), new Instant(0L)),
- PaneInfo.ON_TIME_AND_ONLY_FIRING);
- Iterable<WindowedValue<Integer>> oneWindowPushback =
- runner.processElementInReadyWindows(oneWindow);
- assertThat(oneWindowPushback, containsInAnyOrder(oneWindow));
- assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
- }
-
- @Test
- public void processElementSideInputNotReadyMultipleWindows() {
- when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
- .thenReturn(false);
-
- PushbackSideInputDoFnRunner<Integer, Integer> runner =
- createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
-
- WindowedValue<Integer> multiWindow =
- WindowedValue.of(
- 2,
- new Instant(-2),
- ImmutableList.of(
- new IntervalWindow(new Instant(-500L), new Instant(0L)),
- new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
- GlobalWindow.INSTANCE),
- PaneInfo.ON_TIME_AND_ONLY_FIRING);
- Iterable<WindowedValue<Integer>> multiWindowPushback =
- runner.processElementInReadyWindows(multiWindow);
- assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows()));
- assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
- }
-
- @Test
- public void processElementSideInputNotReadySomeWindows() {
- when(reader.isReady(Mockito.eq(singletonView), Mockito.eq(GlobalWindow.INSTANCE)))
- .thenReturn(false);
- when(
- reader.isReady(
- Mockito.eq(singletonView),
- org.mockito.AdditionalMatchers.not(Mockito.eq(GlobalWindow.INSTANCE))))
- .thenReturn(true);
-
- PushbackSideInputDoFnRunner<Integer, Integer> runner =
- createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
-
- IntervalWindow littleWindow = new IntervalWindow(new Instant(-500L), new Instant(0L));
- IntervalWindow bigWindow =
- new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L));
- WindowedValue<Integer> multiWindow =
- WindowedValue.of(
- 2,
- new Instant(-2),
- ImmutableList.of(littleWindow, bigWindow, GlobalWindow.INSTANCE),
- PaneInfo.NO_FIRING);
- Iterable<WindowedValue<Integer>> multiWindowPushback =
- runner.processElementInReadyWindows(multiWindow);
- assertThat(
- multiWindowPushback,
- containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-2L))));
- assertThat(underlying.inputElems,
- containsInAnyOrder(WindowedValue.of(2, new Instant(-2), littleWindow, PaneInfo.NO_FIRING),
- WindowedValue.of(2, new Instant(-2), bigWindow, PaneInfo.NO_FIRING)));
- }
-
- @Test
- public void processElementSideInputReadyAllWindows() {
- when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
- .thenReturn(true);
-
- ImmutableList<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(singletonView);
- PushbackSideInputDoFnRunner<Integer, Integer> runner = createRunner(views);
-
- WindowedValue<Integer> multiWindow =
- WindowedValue.of(
- 2,
- new Instant(-2),
- ImmutableList.of(
- new IntervalWindow(new Instant(-500L), new Instant(0L)),
- new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
- GlobalWindow.INSTANCE),
- PaneInfo.ON_TIME_AND_ONLY_FIRING);
- Iterable<WindowedValue<Integer>> multiWindowPushback =
- runner.processElementInReadyWindows(multiWindow);
- assertThat(multiWindowPushback, emptyIterable());
- assertThat(underlying.inputElems,
- containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray()));
- }
-
- @Test
- public void processElementNoSideInputs() {
- PushbackSideInputDoFnRunner<Integer, Integer> runner =
- createRunner(ImmutableList.<PCollectionView<?>>of());
-
- WindowedValue<Integer> multiWindow =
- WindowedValue.of(
- 2,
- new Instant(-2),
- ImmutableList.of(
- new IntervalWindow(new Instant(-500L), new Instant(0L)),
- new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
- GlobalWindow.INSTANCE),
- PaneInfo.ON_TIME_AND_ONLY_FIRING);
- Iterable<WindowedValue<Integer>> multiWindowPushback =
- runner.processElementInReadyWindows(multiWindow);
- assertThat(multiWindowPushback, emptyIterable());
- assertThat(underlying.inputElems, containsInAnyOrder(multiWindow));
- }
-
- private static class TestDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
- List<WindowedValue<InputT>> inputElems;
- private boolean started = false;
- private boolean finished = false;
-
- @Override
- public void startBundle() {
- started = true;
- inputElems = new ArrayList<>();
- }
-
- @Override
- public void processElement(WindowedValue<InputT> elem) {
- inputElems.add(elem);
- }
-
- @Override
- public void finishBundle() {
- finished = true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
deleted file mode 100644
index 647495c..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
+++ /dev/null
@@ -1,1442 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue;
-import static org.apache.beam.sdk.WindowMatchers.isWindowedValue;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.emptyIterable;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.withSettings;
-
-import com.google.common.collect.Iterables;
-import java.util.Iterator;
-import java.util.List;
-import org.apache.beam.sdk.WindowMatchers;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
-import org.apache.beam.sdk.transforms.CombineWithContext.Context;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.AfterEach;
-import org.apache.beam.sdk.transforms.windowing.AfterFirst;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.Never;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Matchers;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/**
- * Tests for {@link ReduceFnRunner}. These tests instantiate a full "stack" of
- * {@link ReduceFnRunner} with enclosed {@link ReduceFn}, down to the installed {@link Trigger}
- * (sometimes mocked). They proceed by injecting elements and advancing watermark and
- * processing time, then verifying produced panes and counters.
- */
-@RunWith(JUnit4.class)
-public class ReduceFnRunnerTest {
- @Mock private SideInputReader mockSideInputReader;
- private Trigger mockTrigger;
- private PCollectionView<Integer> mockView;
-
- private IntervalWindow firstWindow;
-
- private static Trigger.TriggerContext anyTriggerContext() {
- return Mockito.<Trigger.TriggerContext>any();
- }
- private static Trigger.OnElementContext anyElementContext() {
- return Mockito.<Trigger.OnElementContext>any();
- }
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
-
- mockTrigger = mock(Trigger.class, withSettings().serializable());
-
- @SuppressWarnings("unchecked")
- PCollectionView<Integer> mockViewUnchecked =
- mock(PCollectionView.class, withSettings().serializable());
- mockView = mockViewUnchecked;
- firstWindow = new IntervalWindow(new Instant(0), new Instant(10));
- }
-
- private void injectElement(ReduceFnTester<Integer, ?, IntervalWindow> tester, int element)
- throws Exception {
- doNothing().when(mockTrigger).onElement(anyElementContext());
- tester.injectElements(TimestampedValue.of(element, new Instant(element)));
- }
-
- private void triggerShouldFinish(Trigger mockTrigger) throws Exception {
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Exception {
- @SuppressWarnings("unchecked")
- Trigger.TriggerContext context =
- (Trigger.TriggerContext) invocation.getArguments()[0];
- context.trigger().setFinished(true);
- return null;
- }
- })
- .when(mockTrigger).onFire(anyTriggerContext());
- }
-
- @Test
- public void testOnElementBufferingDiscarding() throws Exception {
- // Test basic execution of a trigger using a non-combining window set and discarding mode.
- ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
- AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
- ClosingBehavior.FIRE_IF_NON_EMPTY);
-
- // Pane of {1, 2}
- injectElement(tester, 1);
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- injectElement(tester, 2);
- assertThat(tester.extractOutput(),
- contains(isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10)));
-
- // Pane of just 3, and finish
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
- injectElement(tester, 3);
- assertThat(tester.extractOutput(),
- contains(isSingleWindowedValue(containsInAnyOrder(3), 3, 0, 10)));
- assertTrue(tester.isMarkedFinished(firstWindow));
- tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
-
- // This element shouldn't be seen, because the trigger has finished
- injectElement(tester, 4);
-
- assertEquals(1, tester.getElementsDroppedDueToClosedWindow());
- }
-
- @Test
- public void testOnElementBufferingAccumulating() throws Exception {
- // Test basic execution of a trigger using a non-combining window set and accumulating mode.
- ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
- AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(100),
- ClosingBehavior.FIRE_IF_NON_EMPTY);
-
- injectElement(tester, 1);
-
- // Fires {1, 2}
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- injectElement(tester, 2);
-
- // Fires {1, 2, 3} because we are in accumulating mode
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
- injectElement(tester, 3);
-
- // This element shouldn't be seen, because the trigger has finished
- injectElement(tester, 4);
-
- assertThat(
- tester.extractOutput(),
- contains(
- isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10),
- isSingleWindowedValue(containsInAnyOrder(1, 2, 3), 3, 0, 10)));
- assertTrue(tester.isMarkedFinished(firstWindow));
- tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
- }
-
- @Test
- public void testOnElementCombiningDiscarding() throws Exception {
- // Test basic execution of a trigger using a non-combining window set and discarding mode.
- ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining(
- FixedWindows.of(Duration.millis(10)), mockTrigger, AccumulationMode.DISCARDING_FIRED_PANES,
- new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of(), Duration.millis(100));
-
- injectElement(tester, 2);
-
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- injectElement(tester, 3);
-
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
- injectElement(tester, 4);
-
- // This element shouldn't be seen, because the trigger has finished
- injectElement(tester, 6);
-
- assertThat(
- tester.extractOutput(),
- contains(
- isSingleWindowedValue(equalTo(5), 2, 0, 10),
- isSingleWindowedValue(equalTo(4), 4, 0, 10)));
- assertTrue(tester.isMarkedFinished(firstWindow));
- tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
- }
-
- /**
- * Tests that the garbage collection time for a fixed window does not overflow the end of time.
- */
- @Test
- public void testFixedWindowEndOfTimeGarbageCollection() throws Exception {
-
- Duration allowedLateness = Duration.standardDays(365);
- Duration windowSize = Duration.millis(10);
- WindowFn<Object, IntervalWindow> windowFn = FixedWindows.of(windowSize);
-
- // This timestamp falls into a window where the end of the window is before the end of the
- // global window - the "end of time" - yet its expiration time is after.
- final Instant elementTimestamp =
- GlobalWindow.INSTANCE.maxTimestamp().minus(allowedLateness).plus(1);
-
- IntervalWindow window = Iterables.getOnlyElement(
- windowFn.assignWindows(
- windowFn.new AssignContext() {
- @Override
- public Object element() {
- throw new UnsupportedOperationException();
- }
- @Override
- public Instant timestamp() {
- return elementTimestamp;
- }
-
- @Override
- public BoundedWindow window() {
- throw new UnsupportedOperationException();
- }
- }));
-
- assertTrue(
- window.maxTimestamp().isBefore(GlobalWindow.INSTANCE.maxTimestamp()));
- assertTrue(
- window.maxTimestamp().plus(allowedLateness).isAfter(GlobalWindow.INSTANCE.maxTimestamp()));
-
- // Test basic execution of a trigger using a non-combining window set and accumulating mode.
- ReduceFnTester<Integer, Integer, IntervalWindow> tester =
- ReduceFnTester.combining(
- windowFn,
- AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever()),
- AccumulationMode.DISCARDING_FIRED_PANES,
- new Sum.SumIntegerFn().<String>asKeyedFn(),
- VarIntCoder.of(),
- allowedLateness);
-
- tester.injectElements(TimestampedValue.of(13, elementTimestamp));
-
- // Should fire ON_TIME pane and there will be a checkState that the cleanup time
- // is prior to timestamp max value
- tester.advanceInputWatermark(window.maxTimestamp());
-
- // Nothing in the ON_TIME pane (not governed by triggers, but by ReduceFnRunner)
- assertThat(tester.extractOutput(), emptyIterable());
-
- tester.injectElements(TimestampedValue.of(42, elementTimestamp));
-
- // Now the final pane should fire, demonstrating that the GC time was truncated
- tester.advanceInputWatermark(GlobalWindow.INSTANCE.maxTimestamp());
- assertThat(tester.extractOutput(), contains(isWindowedValue(equalTo(55))));
- }
-
- @Test
- public void testOnElementCombiningAccumulating() throws Exception {
- // Test basic execution of a trigger using a non-combining window set and accumulating mode.
- ReduceFnTester<Integer, Integer, IntervalWindow> tester =
- ReduceFnTester.combining(FixedWindows.of(Duration.millis(10)), mockTrigger,
- AccumulationMode.ACCUMULATING_FIRED_PANES, new Sum.SumIntegerFn().<String>asKeyedFn(),
- VarIntCoder.of(), Duration.millis(100));
-
- injectElement(tester, 1);
-
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- injectElement(tester, 2);
-
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
- injectElement(tester, 3);
-
- // This element shouldn't be seen, because the trigger has finished
- injectElement(tester, 4);
-
- assertThat(
- tester.extractOutput(),
- contains(
- isSingleWindowedValue(equalTo(3), 1, 0, 10),
- isSingleWindowedValue(equalTo(6), 3, 0, 10)));
- assertTrue(tester.isMarkedFinished(firstWindow));
- tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
- }
-
- @Test
- public void testOnElementCombiningWithContext() throws Exception {
- Integer expectedValue = 5;
- WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy
- .of(FixedWindows.of(Duration.millis(10)))
- .withTrigger(mockTrigger)
- .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
- .withAllowedLateness(Duration.millis(100));
-
- TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
- options.setValue(5);
-
- when(mockSideInputReader.contains(Matchers.<PCollectionView<Integer>>any())).thenReturn(true);
- when(mockSideInputReader.get(
- Matchers.<PCollectionView<Integer>>any(), any(BoundedWindow.class))).thenReturn(5);
-
- @SuppressWarnings({"rawtypes", "unchecked", "unused"})
- Object suppressWarningsVar = when(mockView.getWindowingStrategyInternal())
- .thenReturn((WindowingStrategy) windowingStrategy);
-
- SumAndVerifyContextFn combineFn = new SumAndVerifyContextFn(mockView, expectedValue);
- // Test basic execution of a trigger using a non-combining window set and discarding mode.
- ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining(
- windowingStrategy, combineFn.<String>asKeyedFn(),
- VarIntCoder.of(), options, mockSideInputReader);
-
- injectElement(tester, 2);
-
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- injectElement(tester, 3);
-
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
- injectElement(tester, 4);
-
- // This element shouldn't be seen, because the trigger has finished
- injectElement(tester, 6);
-
- assertThat(
- tester.extractOutput(),
- contains(
- isSingleWindowedValue(equalTo(5), 2, 0, 10),
- isSingleWindowedValue(equalTo(4), 4, 0, 10)));
- assertTrue(tester.isMarkedFinished(firstWindow));
- tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
- }
-
- @Test
- public void testWatermarkHoldAndLateData() throws Exception {
- // Test handling of late data. Specifically, ensure the watermark hold is correct.
- ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
- AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(10),
- ClosingBehavior.FIRE_IF_NON_EMPTY);
-
- // Input watermark -> null
- assertEquals(null, tester.getWatermarkHold());
- assertEquals(null, tester.getOutputWatermark());
-
- // All on time data, verify watermark hold.
- injectElement(tester, 1);
- injectElement(tester, 3);
- assertEquals(new Instant(1), tester.getWatermarkHold());
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- injectElement(tester, 2);
- List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
- assertThat(output, contains(
- isSingleWindowedValue(containsInAnyOrder(1, 2, 3),
- 1, // timestamp
- 0, // window start
- 10))); // window end
- assertThat(output.get(0).getPane(),
- equalTo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));
-
- // Holding for the end-of-window transition.
- assertEquals(new Instant(9), tester.getWatermarkHold());
- // Nothing dropped.
- assertEquals(0, tester.getElementsDroppedDueToClosedWindow());
-
- // Input watermark -> 4, output watermark should advance that far as well
- tester.advanceInputWatermark(new Instant(4));
- assertEquals(new Instant(4), tester.getOutputWatermark());
-
- // Some late, some on time. Verify that we only hold to the minimum of on-time.
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
- tester.advanceInputWatermark(new Instant(4));
- injectElement(tester, 2);
- injectElement(tester, 3);
- assertEquals(new Instant(9), tester.getWatermarkHold());
- injectElement(tester, 5);
- assertEquals(new Instant(5), tester.getWatermarkHold());
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- injectElement(tester, 4);
- output = tester.extractOutput();
- assertThat(output,
- contains(
- isSingleWindowedValue(containsInAnyOrder(
- 1, 2, 3, // earlier firing
- 2, 3, 4, 5), // new elements
- 4, // timestamp
- 0, // window start
- 10))); // window end
- assertThat(output.get(0).getPane(),
- equalTo(PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)));
-
- // All late -- output at end of window timestamp.
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
- tester.advanceInputWatermark(new Instant(8));
- injectElement(tester, 6);
- injectElement(tester, 5);
- assertEquals(new Instant(9), tester.getWatermarkHold());
- injectElement(tester, 4);
-
- // Fire the ON_TIME pane
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- tester.advanceInputWatermark(new Instant(10));
-
- // Output time is end of the window, because all the new data was late, but the pane
- // is the ON_TIME pane.
- output = tester.extractOutput();
- assertThat(output,
- contains(isSingleWindowedValue(
- containsInAnyOrder(1, 2, 3, // earlier firing
- 2, 3, 4, 5, // earlier firing
- 4, 5, 6), // new elements
- 9, // timestamp
- 0, // window start
- 10))); // window end
- assertThat(output.get(0).getPane(),
- equalTo(PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0)));
-
- // This is "pending" at the time the watermark makes it way-late.
- // Because we're about to expire the window, we output it.
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
- injectElement(tester, 8);
- assertEquals(0, tester.getElementsDroppedDueToClosedWindow());
-
- // Exceed the GC limit, triggering the last pane to be fired
- tester.advanceInputWatermark(new Instant(50));
- output = tester.extractOutput();
- // Output time is still end of the window, because the new data (8) was behind
- // the output watermark.
- assertThat(output,
- contains(isSingleWindowedValue(
- containsInAnyOrder(1, 2, 3, // earlier firing
- 2, 3, 4, 5, // earlier firing
- 4, 5, 6, // earlier firing
- 8), // new element prior to window becoming expired
- 9, // timestamp
- 0, // window start
- 10))); // window end
- assertThat(
- output.get(0).getPane(),
- equalTo(PaneInfo.createPane(false, true, Timing.LATE, 3, 1)));
- assertEquals(new Instant(50), tester.getOutputWatermark());
- assertEquals(null, tester.getWatermarkHold());
-
- // Late timers are ignored
- tester.fireTimer(new IntervalWindow(new Instant(0), new Instant(10)), new Instant(12),
- TimeDomain.EVENT_TIME);
-
- // And because we're past the end of window + allowed lateness, everything should be cleaned up.
- assertFalse(tester.isMarkedFinished(firstWindow));
- tester.assertHasOnlyGlobalAndFinishedSetsFor();
- }
-
- @Test
- public void dontSetHoldIfTooLateForEndOfWindowTimer() throws Exception {
- // Make sure holds are only set if they are accompanied by an end-of-window timer.
- ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
- AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(10),
- ClosingBehavior.FIRE_ALWAYS);
- tester.setAutoAdvanceOutputWatermark(false);
-
- // Case: Unobservably late
- tester.advanceInputWatermark(new Instant(15));
- tester.advanceOutputWatermark(new Instant(11));
- injectElement(tester, 14);
- // Hold was applied, waiting for end-of-window timer.
- assertEquals(new Instant(14), tester.getWatermarkHold());
- assertEquals(new Instant(19), tester.getNextTimer(TimeDomain.EVENT_TIME));
-
- // Trigger the end-of-window timer.
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- tester.advanceInputWatermark(new Instant(20));
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
- // Hold has been replaced with garbage collection hold. Waiting for garbage collection.
- assertEquals(new Instant(29), tester.getWatermarkHold());
- assertEquals(new Instant(29), tester.getNextTimer(TimeDomain.EVENT_TIME));
-
- // Case: Maybe late 1
- injectElement(tester, 13);
- // No change to hold or timers.
- assertEquals(new Instant(29), tester.getWatermarkHold());
- assertEquals(new Instant(29), tester.getNextTimer(TimeDomain.EVENT_TIME));
-
- // Trigger the garbage collection timer.
- tester.advanceInputWatermark(new Instant(30));
-
- // Everything should be cleaned up.
- assertFalse(tester.isMarkedFinished(new IntervalWindow(new Instant(10), new Instant(20))));
- tester.assertHasOnlyGlobalAndFinishedSetsFor();
- }
-
- @Test
- public void testPaneInfoAllStates() throws Exception {
- ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
- AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
- ClosingBehavior.FIRE_IF_NON_EMPTY);
-
- tester.advanceInputWatermark(new Instant(0));
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- injectElement(tester, 1);
- assertThat(tester.extractOutput(), contains(
- WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY))));
-
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- injectElement(tester, 2);
- assertThat(tester.extractOutput(), contains(
- WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.EARLY, 1, -1))));
-
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
- tester.advanceInputWatermark(new Instant(15));
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- injectElement(tester, 3);
- assertThat(tester.extractOutput(), contains(
- WindowMatchers.valueWithPaneInfo(
- PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0))));
-
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- injectElement(tester, 4);
- assertThat(tester.extractOutput(), contains(
- WindowMatchers.valueWithPaneInfo(
- PaneInfo.createPane(false, false, Timing.LATE, 3, 1))));
-
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
- injectElement(tester, 5);
- assertThat(tester.extractOutput(), contains(
- WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 4, 2))));
- }
-
- @Test
- public void testPaneInfoAllStatesAfterWatermark() throws Exception {
- ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
- WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
- .withTrigger(Repeatedly.forever(AfterFirst.of(
- AfterPane.elementCountAtLeast(2),
- AfterWatermark.pastEndOfWindow())))
- .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
- .withAllowedLateness(Duration.millis(100))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
- .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
-
- tester.advanceInputWatermark(new Instant(0));
- tester.injectElements(
- TimestampedValue.of(1, new Instant(1)), TimestampedValue.of(2, new Instant(2)));
-
- List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
- assertThat(
- output,
- contains(WindowMatchers.valueWithPaneInfo(
- PaneInfo.createPane(true, false, Timing.EARLY, 0, -1))));
- assertThat(
- output,
- contains(
- WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10)));
-
- tester.advanceInputWatermark(new Instant(50));
-
- // We should get the ON_TIME pane even though it is empty,
- // because we have an AfterWatermark.pastEndOfWindow() trigger.
- output = tester.extractOutput();
- assertThat(
- output,
- contains(WindowMatchers.valueWithPaneInfo(
- PaneInfo.createPane(false, false, Timing.ON_TIME, 1, 0))));
- assertThat(
- output,
- contains(
- WindowMatchers.isSingleWindowedValue(emptyIterable(), 9, 0, 10)));
-
- // We should get the final pane even though it is empty.
- tester.advanceInputWatermark(new Instant(150));
- output = tester.extractOutput();
- assertThat(
- output,
- contains(
- WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 2, 1))));
- assertThat(
- output,
- contains(
- WindowMatchers.isSingleWindowedValue(emptyIterable(), 9, 0, 10)));
- }
-
- @Test
- public void noEmptyPanesFinalIfNonEmpty() throws Exception {
- ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
- WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
- .withTrigger(Repeatedly.<IntervalWindow>forever(AfterFirst.<IntervalWindow>of(
- AfterPane.elementCountAtLeast(2),
- AfterWatermark.pastEndOfWindow())))
- .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
- .withAllowedLateness(Duration.millis(100))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
- .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY));
-
- tester.advanceInputWatermark(new Instant(0));
- tester.injectElements(
- TimestampedValue.of(1, new Instant(1)),
- TimestampedValue.of(2, new Instant(2)));
- tester.advanceInputWatermark(new Instant(20));
- tester.advanceInputWatermark(new Instant(250));
-
- List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
- assertThat(output, contains(
- // Trigger with 2 elements
- WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10),
- // Trigger for the empty on time pane
- WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10)));
- }
-
- @Test
- public void noEmptyPanesFinalAlways() throws Exception {
- ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
- WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
- .withTrigger(Repeatedly.<IntervalWindow>forever(AfterFirst.<IntervalWindow>of(
- AfterPane.elementCountAtLeast(2),
- AfterWatermark.pastEndOfWindow())))
- .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
- .withAllowedLateness(Duration.millis(100))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
- .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
-
- tester.advanceInputWatermark(new Instant(0));
- tester.injectElements(
- TimestampedValue.of(1, new Instant(1)),
- TimestampedValue.of(2, new Instant(2)));
- tester.advanceInputWatermark(new Instant(20));
- tester.advanceInputWatermark(new Instant(250));
-
- List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
- assertThat(output, contains(
- // Trigger with 2 elements
- WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10),
- // Trigger for the empty on time pane
- WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10),
- // Trigger for the final pane
- WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10)));
- }
-
- @Test
- public void testPaneInfoAllStatesAfterWatermarkAccumulating() throws Exception {
- ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
- WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
- .withTrigger(Repeatedly.forever(AfterFirst.of(
- AfterPane.elementCountAtLeast(2),
- AfterWatermark.pastEndOfWindow())))
- .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
- .withAllowedLateness(Duration.millis(100))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
- .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
-
- tester.advanceInputWatermark(new Instant(0));
- tester.injectElements(
- TimestampedValue.of(1, new Instant(1)), TimestampedValue.of(2, new Instant(2)));
-
- List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
- assertThat(
- output,
- contains(WindowMatchers.valueWithPaneInfo(
- PaneInfo.createPane(true, false, Timing.EARLY, 0, -1))));
- assertThat(
- output,
- contains(
- WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10)));
-
- tester.advanceInputWatermark(new Instant(50));
-
- // We should get the ON_TIME pane even though it is empty,
- // because we have an AfterWatermark.pastEndOfWindow() trigger.
- output = tester.extractOutput();
- assertThat(
- output,
- contains(WindowMatchers.valueWithPaneInfo(
- PaneInfo.createPane(false, false, Timing.ON_TIME, 1, 0))));
- assertThat(
- output,
- contains(
- WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10)));
-
- // We should get the final pane even though it is empty.
- tester.advanceInputWatermark(new Instant(150));
- output = tester.extractOutput();
- assertThat(
- output,
- contains(
- WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 2, 1))));
- assertThat(
- output,
- contains(
- WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10)));
- }
-
- @Test
- public void testPaneInfoFinalAndOnTime() throws Exception {
- ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
- WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
- .withTrigger(
- Repeatedly.forever(AfterPane.elementCountAtLeast(2))
- .orFinally(AfterWatermark.pastEndOfWindow()))
- .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
- .withAllowedLateness(Duration.millis(100))
- .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
-
- tester.advanceInputWatermark(new Instant(0));
-
- // Should trigger due to element count
- tester.injectElements(
- TimestampedValue.of(1, new Instant(1)), TimestampedValue.of(2, new Instant(2)));
-
- assertThat(
- tester.extractOutput(),
- contains(WindowMatchers.valueWithPaneInfo(
- PaneInfo.createPane(true, false, Timing.EARLY, 0, -1))));
-
- tester.advanceInputWatermark(new Instant(150));
- assertThat(tester.extractOutput(), contains(
- WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.ON_TIME, 1, 0))));
- }
-
- @Test
- public void testPaneInfoSkipToFinish() throws Exception {
- ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
- AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
- ClosingBehavior.FIRE_IF_NON_EMPTY);
-
- tester.advanceInputWatermark(new Instant(0));
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
- injectElement(tester, 1);
- assertThat(tester.extractOutput(), contains(
- WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, true, Timing.EARLY))));
- }
-
- @Test
- public void testPaneInfoSkipToNonSpeculativeAndFinish() throws Exception {
- ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
- AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
- ClosingBehavior.FIRE_IF_NON_EMPTY);
-
- tester.advanceInputWatermark(new Instant(15));
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
- injectElement(tester, 1);
- assertThat(tester.extractOutput(), contains(
- WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, true, Timing.LATE))));
- }
-
- @Test
- public void testMergeBeforeFinalizing() throws Exception {
- // Verify that we merge windows before producing output so users don't see undesired
- // unmerged windows.
- ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
- AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(0),
- ClosingBehavior.FIRE_IF_NON_EMPTY);
-
- // All on time data, verify watermark hold.
- // These two windows should pre-merge immediately to [1, 20)
- tester.injectElements(
- TimestampedValue.of(1, new Instant(1)), // in [1, 11)
- TimestampedValue.of(10, new Instant(10))); // in [10, 20)
-
- // And this should fire the end-of-window timer
- tester.advanceInputWatermark(new Instant(100));
-
- List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
- assertThat(output.size(), equalTo(1));
- assertThat(output.get(0),
- isSingleWindowedValue(containsInAnyOrder(1, 10),
- 1, // timestamp
- 1, // window start
- 20)); // window end
- assertThat(
- output.get(0).getPane(),
- equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
- }
-
- /**
- * It is possible for a session window's trigger to be closed at the point at which
- * the (merged) session window is garbage collected. Make sure we don't accidentally
- * assume the window is still active.
- */
- @Test
- public void testMergingWithCloseBeforeGC() throws Exception {
- ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
- AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
- ClosingBehavior.FIRE_IF_NON_EMPTY);
-
- // Two elements in two overlapping session windows.
- tester.injectElements(
- TimestampedValue.of(1, new Instant(1)), // in [1, 11)
- TimestampedValue.of(10, new Instant(10))); // in [10, 20)
-
- // Close the trigger, but the gargbage collection timer is still pending.
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
- tester.advanceInputWatermark(new Instant(30));
-
- // Now the garbage collection timer will fire, finding the trigger already closed.
- tester.advanceInputWatermark(new Instant(100));
-
- List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
- assertThat(output.size(), equalTo(1));
- assertThat(output.get(0),
- isSingleWindowedValue(containsInAnyOrder(1, 10),
- 1, // timestamp
- 1, // window start
- 20)); // window end
- assertThat(
- output.get(0).getPane(),
- equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
- }
-
- /**
- * Ensure a closed trigger has its state recorded in the merge result window.
- */
- @Test
- public void testMergingWithCloseTrigger() throws Exception {
- ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
- AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
- ClosingBehavior.FIRE_IF_NON_EMPTY);
-
- // Create a new merged session window.
- tester.injectElements(TimestampedValue.of(1, new Instant(1)),
- TimestampedValue.of(2, new Instant(2)));
-
- // Force the trigger to be closed for the merged window.
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
- tester.advanceInputWatermark(new Instant(13));
-
- // Trigger is now closed.
- assertTrue(tester.isMarkedFinished(new IntervalWindow(new Instant(1), new Instant(12))));
-
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
-
- // Revisit the same session window.
- tester.injectElements(TimestampedValue.of(1, new Instant(1)),
- TimestampedValue.of(2, new Instant(2)));
-
- // Trigger is still closed.
- assertTrue(tester.isMarkedFinished(new IntervalWindow(new Instant(1), new Instant(12))));
- }
-
- /**
- * If a later event tries to reuse an earlier session window which has been closed, we
- * should reject that element and not fail due to the window no longer being active.
- */
- @Test
- public void testMergingWithReusedWindow() throws Exception {
- ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
- AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
- ClosingBehavior.FIRE_IF_NON_EMPTY);
-
- // One elements in one session window.
- tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 11), gc at 21.
-
- // Close the trigger, but the gargbage collection timer is still pending.
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
- tester.advanceInputWatermark(new Instant(15));
-
- // Another element in the same session window.
- // Should be discarded with 'window closed'.
- tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 11), gc at 21.
-
- // And nothing should be left in the active window state.
- assertTrue(tester.hasNoActiveWindows());
-
- // Now the garbage collection timer will fire, finding the trigger already closed.
- tester.advanceInputWatermark(new Instant(100));
-
- List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
- assertThat(output.size(), equalTo(1));
- assertThat(output.get(0),
- isSingleWindowedValue(containsInAnyOrder(1),
- 1, // timestamp
- 1, // window start
- 11)); // window end
- assertThat(
- output.get(0).getPane(),
- equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
- }
-
- /**
- * When a merged window's trigger is closed we record that state using the merged window rather
- * than the original windows.
- */
- @Test
- public void testMergingWithClosedRepresentative() throws Exception {
- ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
- AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
- ClosingBehavior.FIRE_IF_NON_EMPTY);
-
- // 2 elements into merged session window.
- // Close the trigger, but the garbage collection timer is still pending.
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
- tester.injectElements(TimestampedValue.of(1, new Instant(1)), // in [1, 11), gc at 21.
- TimestampedValue.of(8, new Instant(8))); // in [8, 18), gc at 28.
-
- // More elements into the same merged session window.
- // It has not yet been gced.
- // Should be discarded with 'window closed'.
- tester.injectElements(TimestampedValue.of(1, new Instant(1)), // in [1, 11), gc at 21.
- TimestampedValue.of(2, new Instant(2)), // in [2, 12), gc at 22.
- TimestampedValue.of(8, new Instant(8))); // in [8, 18), gc at 28.
-
- // Now the garbage collection timer will fire, finding the trigger already closed.
- tester.advanceInputWatermark(new Instant(100));
-
- List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
-
- assertThat(output.size(), equalTo(1));
- assertThat(output.get(0),
- isSingleWindowedValue(containsInAnyOrder(1, 8),
- 1, // timestamp
- 1, // window start
- 18)); // window end
- assertThat(
- output.get(0).getPane(),
- equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0)));
- }
-
- /**
- * If an element for a closed session window ends up being merged into other still-open
- * session windows, the resulting session window is not 'poisoned'.
- */
- @Test
- public void testMergingWithClosedDoesNotPoison() throws Exception {
- ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
- AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
- ClosingBehavior.FIRE_IF_NON_EMPTY);
-
- // 1 element, force its trigger to close.
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
- tester.injectElements(TimestampedValue.of(2, new Instant(2)));
-
- // 3 elements, one already closed.
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
- tester.injectElements(TimestampedValue.of(1, new Instant(1)),
- TimestampedValue.of(2, new Instant(2)),
- TimestampedValue.of(3, new Instant(3)));
-
- tester.advanceInputWatermark(new Instant(100));
-
- List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
- assertThat(output.size(), equalTo(2));
- assertThat(output.get(0),
- isSingleWindowedValue(containsInAnyOrder(2),
- 2, // timestamp
- 2, // window start
- 12)); // window end
- assertThat(
- output.get(0).getPane(),
- equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0)));
- assertThat(output.get(1),
- isSingleWindowedValue(containsInAnyOrder(1, 2, 3),
- 1, // timestamp
- 1, // window start
- 13)); // window end
- assertThat(
- output.get(1).getPane(),
- equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
- }
-
- /**
- * Tests that when data is assigned to multiple windows but some of those windows have
- * had their triggers finish, then the data is dropped and counted accurately.
- */
- @Test
- public void testDropDataMultipleWindowsFinishedTrigger() throws Exception {
- ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining(
- WindowingStrategy.of(
- SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30)))
- .withTrigger(AfterWatermark.pastEndOfWindow())
- .withAllowedLateness(Duration.millis(1000)),
- new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
-
- tester.injectElements(
- // assigned to [-60, 40), [-30, 70), [0, 100)
- TimestampedValue.of(10, new Instant(23)),
- // assigned to [-30, 70), [0, 100), [30, 130)
- TimestampedValue.of(12, new Instant(40)));
-
- assertEquals(0, tester.getElementsDroppedDueToClosedWindow());
-
- tester.advanceInputWatermark(new Instant(70));
- tester.injectElements(
- // assigned to [-30, 70), [0, 100), [30, 130)
- // but [-30, 70) is closed by the trigger
- TimestampedValue.of(14, new Instant(60)));
-
- assertEquals(1, tester.getElementsDroppedDueToClosedWindow());
-
- tester.advanceInputWatermark(new Instant(130));
- // assigned to [-30, 70), [0, 100), [30, 130)
- // but they are all closed
- tester.injectElements(TimestampedValue.of(16, new Instant(40)));
-
- assertEquals(4, tester.getElementsDroppedDueToClosedWindow());
- }
-
- @Test
- public void testIdempotentEmptyPanesDiscarding() throws Exception {
- // Test uninteresting (empty) panes don't increment the index or otherwise
- // modify PaneInfo.
- ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
- AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
- ClosingBehavior.FIRE_IF_NON_EMPTY);
-
- // Inject a couple of on-time elements and fire at the window end.
- injectElement(tester, 1);
- injectElement(tester, 2);
- tester.advanceInputWatermark(new Instant(12));
-
- // Fire the on-time pane
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- tester.fireTimer(firstWindow, new Instant(9), TimeDomain.EVENT_TIME);
-
- // Fire another timer (with no data, so it's an uninteresting pane that should not be output).
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- tester.fireTimer(firstWindow, new Instant(9), TimeDomain.EVENT_TIME);
-
- // Finish it off with another datum.
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
- injectElement(tester, 3);
-
- // The intermediate trigger firing shouldn't result in any output.
- List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
- assertThat(output.size(), equalTo(2));
-
- // The on-time pane is as expected.
- assertThat(output.get(0), isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10));
-
- // The late pane has the correct indices.
- assertThat(output.get(1).getValue(), contains(3));
- assertThat(
- output.get(1).getPane(), equalTo(PaneInfo.createPane(false, true, Timing.LATE, 1, 1)));
-
- assertTrue(tester.isMarkedFinished(firstWindow));
- tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
-
- assertEquals(0, tester.getElementsDroppedDueToClosedWindow());
- }
-
- @Test
- public void testIdempotentEmptyPanesAccumulating() throws Exception {
- // Test uninteresting (empty) panes don't increment the index or otherwise
- // modify PaneInfo.
- ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
- AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(100),
- ClosingBehavior.FIRE_IF_NON_EMPTY);
-
- // Inject a couple of on-time elements and fire at the window end.
- injectElement(tester, 1);
- injectElement(tester, 2);
- tester.advanceInputWatermark(new Instant(12));
-
- // Trigger the on-time pane
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- tester.fireTimer(firstWindow, new Instant(9), TimeDomain.EVENT_TIME);
- List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
- assertThat(output.size(), equalTo(1));
- assertThat(output.get(0), isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10));
- assertThat(output.get(0).getPane(),
- equalTo(PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0)));
-
- // Fire another timer with no data; the empty pane should not be output even though the
- // trigger is ready to fire
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- tester.fireTimer(firstWindow, new Instant(9), TimeDomain.EVENT_TIME);
- assertThat(tester.extractOutput().size(), equalTo(0));
-
- // Finish it off with another datum, which is late
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
- injectElement(tester, 3);
- output = tester.extractOutput();
- assertThat(output.size(), equalTo(1));
-
- // The late pane has the correct indices.
- assertThat(output.get(0).getValue(), containsInAnyOrder(1, 2, 3));
- assertThat(output.get(0).getPane(),
- equalTo(PaneInfo.createPane(false, true, Timing.LATE, 1, 1)));
-
- assertTrue(tester.isMarkedFinished(firstWindow));
- tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
-
- assertEquals(0, tester.getElementsDroppedDueToClosedWindow());
- }
-
- /**
- * Test that we receive an empty on-time pane when an or-finally waiting for the watermark fires.
- * Specifically, verify the proper triggerings and pane-info of a typical speculative/on-time/late
- * when the on-time pane is empty.
- */
- @Test
- public void testEmptyOnTimeFromOrFinally() throws Exception {
- ReduceFnTester<Integer, Integer, IntervalWindow> tester =
- ReduceFnTester.combining(FixedWindows.of(Duration.millis(10)),
- AfterEach.<IntervalWindow>inOrder(
- Repeatedly
- .forever(
- AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
- new Duration(5)))
- .orFinally(AfterWatermark.pastEndOfWindow()),
- Repeatedly.forever(
- AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
- new Duration(25)))),
- AccumulationMode.ACCUMULATING_FIRED_PANES, new Sum.SumIntegerFn().<String>asKeyedFn(),
- VarIntCoder.of(), Duration.millis(100));
-
- tester.advanceInputWatermark(new Instant(0));
- tester.advanceProcessingTime(new Instant(0));
-
- // Processing time timer for 5
- tester.injectElements(
- TimestampedValue.of(1, new Instant(1)),
- TimestampedValue.of(1, new Instant(3)),
- TimestampedValue.of(1, new Instant(7)),
- TimestampedValue.of(1, new Instant(5)));
-
- // Should fire early pane
- tester.advanceProcessingTime(new Instant(6));
-
- // Should fire empty on time pane
- tester.advanceInputWatermark(new Instant(11));
- List<WindowedValue<Integer>> output = tester.extractOutput();
- assertEquals(2, output.size());
-
- assertThat(output.get(0), WindowMatchers.isSingleWindowedValue(4, 1, 0, 10));
- assertThat(output.get(1), WindowMatchers.isSingleWindowedValue(4, 9, 0, 10));
-
- assertThat(
- output.get(0),
- WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));
- assertThat(
- output.get(1),
- WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.ON_TIME, 1, 0)));
- }
-
- /**
- * Tests for processing time firings after the watermark passes the end of the window.
- * Specifically, verify the proper triggerings and pane-info of a typical speculative/on-time/late
- * when the on-time pane is non-empty.
- */
- @Test
- public void testProcessingTime() throws Exception {
- ReduceFnTester<Integer, Integer, IntervalWindow> tester =
- ReduceFnTester.combining(FixedWindows.of(Duration.millis(10)),
- AfterEach.<IntervalWindow>inOrder(
- Repeatedly
- .forever(
- AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
- new Duration(5)))
- .orFinally(AfterWatermark.pastEndOfWindow()),
- Repeatedly.forever(
- AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
- new Duration(25)))),
- AccumulationMode.ACCUMULATING_FIRED_PANES, new Sum.SumIntegerFn().<String>asKeyedFn(),
- VarIntCoder.of(), Duration.millis(100));
-
- tester.advanceInputWatermark(new Instant(0));
- tester.advanceProcessingTime(new Instant(0));
-
- tester.injectElements(TimestampedValue.of(1, new Instant(1)),
- TimestampedValue.of(1, new Instant(3)), TimestampedValue.of(1, new Instant(7)),
- TimestampedValue.of(1, new Instant(5)));
- // 4 elements all at processing time 0
-
- tester.advanceProcessingTime(new Instant(6)); // fire [1,3,7,5] since 6 > 0 + 5
- tester.injectElements(
- TimestampedValue.of(1, new Instant(8)),
- TimestampedValue.of(1, new Instant(4)));
- // 6 elements
-
- tester.advanceInputWatermark(new Instant(11)); // fire [1,3,7,5,8,4] since 11 > 9
- tester.injectElements(
- TimestampedValue.of(1, new Instant(8)),
- TimestampedValue.of(1, new Instant(4)),
- TimestampedValue.of(1, new Instant(5)));
- // 9 elements
-
- tester.advanceInputWatermark(new Instant(12));
- tester.injectElements(
- TimestampedValue.of(1, new Instant(3)));
- // 10 elements
-
- tester.advanceProcessingTime(new Instant(15));
- tester.injectElements(
- TimestampedValue.of(1, new Instant(5)));
- // 11 elements
- tester.advanceProcessingTime(new Instant(32)); // fire since 32 > 6 + 25
-
- tester.injectElements(
- TimestampedValue.of(1, new Instant(3)));
- // 12 elements
- // fire [1,3,7,5,8,4,8,4,5,3,5,3] since 125 > 6 + 25
- tester.advanceInputWatermark(new Instant(125));
-
- List<WindowedValue<Integer>> output = tester.extractOutput();
- assertEquals(4, output.size());
-
- assertThat(output.get(0), WindowMatchers.isSingleWindowedValue(4, 1, 0, 10));
- assertThat(output.get(1), WindowMatchers.isSingleWindowedValue(6, 4, 0, 10));
- assertThat(output.get(2), WindowMatchers.isSingleWindowedValue(11, 9, 0, 10));
- assertThat(output.get(3), WindowMatchers.isSingleWindowedValue(12, 9, 0, 10));
-
- assertThat(
- output.get(0),
- WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));
- assertThat(
- output.get(1),
- WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.ON_TIME, 1, 0)));
- assertThat(
- output.get(2),
- WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.LATE, 2, 1)));
- assertThat(
- output.get(3),
- WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 3, 2)));
- }
-
- /**
- * We should fire a non-empty ON_TIME pane in the GlobalWindow when the watermark moves to
- * end-of-time.
- */
- @Test
- public void fireNonEmptyOnDrainInGlobalWindow() throws Exception {
- ReduceFnTester<Integer, Iterable<Integer>, GlobalWindow> tester =
- ReduceFnTester.nonCombining(
- WindowingStrategy.of(new GlobalWindows())
- .withTrigger(Repeatedly.<GlobalWindow>forever(
- AfterPane.elementCountAtLeast(3)))
- .withMode(AccumulationMode.DISCARDING_FIRED_PANES));
-
- tester.advanceInputWatermark(new Instant(0));
-
- final int n = 20;
- for (int i = 0; i < n; i++) {
- tester.injectElements(TimestampedValue.of(i, new Instant(i)));
- }
-
- List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
- assertEquals(n / 3, output.size());
- for (int i = 0; i < output.size(); i++) {
- assertEquals(Timing.EARLY, output.get(i).getPane().getTiming());
- assertEquals(i, output.get(i).getPane().getIndex());
- assertEquals(3, Iterables.size(output.get(i).getValue()));
- }
-
- tester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
-
- output = tester.extractOutput();
- assertEquals(1, output.size());
- assertEquals(Timing.ON_TIME, output.get(0).getPane().getTiming());
- assertEquals(n / 3, output.get(0).getPane().getIndex());
- assertEquals(n - ((n / 3) * 3), Iterables.size(output.get(0).getValue()));
- }
-
- /**
- * We should fire an empty ON_TIME pane in the GlobalWindow when the watermark moves to
- * end-of-time.
- */
- @Test
- public void fireEmptyOnDrainInGlobalWindowIfRequested() throws Exception {
- ReduceFnTester<Integer, Iterable<Integer>, GlobalWindow> tester =
- ReduceFnTester.nonCombining(
- WindowingStrategy.of(new GlobalWindows())
- .withTrigger(Repeatedly.<GlobalWindow>forever(
- AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
- new Duration(3))))
- .withMode(AccumulationMode.DISCARDING_FIRED_PANES));
-
- final int n = 20;
- for (int i = 0; i < n; i++) {
- tester.advanceProcessingTime(new Instant(i));
- tester.injectElements(TimestampedValue.of(i, new Instant(i)));
- }
- tester.advanceProcessingTime(new Instant(n + 4));
- List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
- assertEquals((n + 3) / 4, output.size());
- for (int i = 0; i < output.size(); i++) {
- assertEquals(Timing.EARLY, output.get(i).getPane().getTiming());
- assertEquals(i, output.get(i).getPane().getIndex());
- assertEquals(4, Iterables.size(output.get(i).getValue()));
- }
-
- tester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
-
- output = tester.extractOutput();
- assertEquals(1, output.size());
- assertEquals(Timing.ON_TIME, output.get(0).getPane().getTiming());
- assertEquals((n + 3) / 4, output.get(0).getPane().getIndex());
- assertEquals(0, Iterables.size(output.get(0).getValue()));
- }
-
- /**
- * Late elements should still have a garbage collection hold set so that they
- * can make a late pane rather than be dropped due to lateness.
- */
- @Test
- public void setGarbageCollectionHoldOnLateElements() throws Exception {
- ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(
- FixedWindows.of(Duration.millis(10)),
- AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(2)),
- AccumulationMode.DISCARDING_FIRED_PANES,
- Duration.millis(100),
- ClosingBehavior.FIRE_IF_NON_EMPTY);
-
- tester.advanceInputWatermark(new Instant(0));
- tester.advanceOutputWatermark(new Instant(0));
- tester.injectElements(TimestampedValue.of(1, new Instant(1)));
-
- // Fire ON_TIME pane @ 9 with 1
-
- tester.advanceInputWatermark(new Instant(109));
- tester.advanceOutputWatermark(new Instant(109));
- tester.injectElements(TimestampedValue.of(2, new Instant(2)));
- // We should have set a garbage collection hold for the final pane.
- Instant hold = tester.getWatermarkHold();
- assertEquals(new Instant(109), hold);
-
- tester.advanceInputWatermark(new Instant(110));
- tester.advanceOutputWatermark(new Instant(110));
-
- // Fire final LATE pane @ 9 with 2
-
- List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
- assertEquals(2, output.size());
- }
-
- private static class SumAndVerifyContextFn extends CombineFnWithContext<Integer, int[], Integer> {
-
- private final PCollectionView<Integer> view;
- private final int expectedValue;
-
- private SumAndVerifyContextFn(PCollectionView<Integer> view, int expectedValue) {
- this.view = view;
- this.expectedValue = expectedValue;
- }
- @Override
- public int[] createAccumulator(Context c) {
- checkArgument(
- c.getPipelineOptions().as(TestOptions.class).getValue() == expectedValue);
- checkArgument(c.sideInput(view) == expectedValue);
- return wrap(0);
- }
-
- @Override
- public int[] addInput(int[] accumulator, Integer input, Context c) {
- checkArgument(
- c.getPipelineOptions().as(TestOptions.class).getValue() == expectedValue);
- checkArgument(c.sideInput(view) == expectedValue);
- accumulator[0] += input.intValue();
- return accumulator;
- }
-
- @Override
- public int[] mergeAccumulators(Iterable<int[]> accumulators, Context c) {
- checkArgument(
- c.getPipelineOptions().as(TestOptions.class).getValue() == expectedValue);
- checkArgument(c.sideInput(view) == expectedValue);
- Iterator<int[]> iter = accumulators.iterator();
- if (!iter.hasNext()) {
- return createAccumulator(c);
- } else {
- int[] running = iter.next();
- while (iter.hasNext()) {
- running[0] += iter.next()[0];
- }
- return running;
- }
- }
-
- @Override
- public Integer extractOutput(int[] accumulator, Context c) {
- checkArgument(
- c.getPipelineOptions().as(TestOptions.class).getValue() == expectedValue);
- checkArgument(c.sideInput(view) == expectedValue);
- return accumulator[0];
- }
-
- private int[] wrap(int value) {
- return new int[] { value };
- }
- }
-
- /**
- * A {@link PipelineOptions} to test combining with context.
- */
- public interface TestOptions extends PipelineOptions {
- Integer getValue();
- void setValue(Integer value);
- }
-}