You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/10/26 16:44:05 UTC
[27/50] [abbrv] incubator-beam git commit: Remove pieces of Trigger
now owned by TriggerStateMachine
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersProperties.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersProperties.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersProperties.java
deleted file mode 100644
index a66f74f..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersProperties.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.transforms.windowing.AfterAll;
-import org.apache.beam.sdk.transforms.windowing.AfterFirst;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-
-/**
- * Generalized tests for {@link FinishedTriggers} implementations.
- */
-public class FinishedTriggersProperties {
- /**
- * Tests that for the provided trigger and {@link FinishedTriggers}, when the trigger is set
- * finished, it is correctly reported as finished.
- */
- public static void verifyGetAfterSet(FinishedTriggers finishedSet, ExecutableTrigger trigger) {
- assertFalse(finishedSet.isFinished(trigger));
- finishedSet.setFinished(trigger, true);
- assertTrue(finishedSet.isFinished(trigger));
- }
-
- /**
- * For a few arbitrary triggers, tests that when the trigger is set finished it is correctly
- * reported as finished.
- */
- public static void verifyGetAfterSet(FinishedTriggers finishedSet) {
- ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of(
- AfterFirst.of(AfterPane.elementCountAtLeast(3), AfterWatermark.pastEndOfWindow()),
- AfterAll.of(
- AfterPane.elementCountAtLeast(10), AfterProcessingTime.pastFirstElementInPane())));
-
- verifyGetAfterSet(finishedSet, trigger);
- verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0).subTriggers().get(1));
- verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0));
- verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1));
- verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1).subTriggers().get(1));
- verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1).subTriggers().get(0));
- }
-
- /**
- * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no
- * others.
- */
- public static void verifyClearRecursively(FinishedTriggers finishedSet) {
- ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of(
- AfterFirst.of(AfterPane.elementCountAtLeast(3), AfterWatermark.pastEndOfWindow()),
- AfterAll.of(
- AfterPane.elementCountAtLeast(10), AfterProcessingTime.pastFirstElementInPane())));
-
- // Set them all finished. This method is not on a trigger as it makes no sense outside tests.
- setFinishedRecursively(finishedSet, trigger);
- assertTrue(finishedSet.isFinished(trigger));
- assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0)));
- assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(0)));
- assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(1)));
-
- // Clear just the second AfterAll
- finishedSet.clearRecursively(trigger.subTriggers().get(1));
-
- // Check that the first and all that are still finished
- assertTrue(finishedSet.isFinished(trigger));
- verifyFinishedRecursively(finishedSet, trigger.subTriggers().get(0));
- verifyUnfinishedRecursively(finishedSet, trigger.subTriggers().get(1));
- }
-
- private static void setFinishedRecursively(
- FinishedTriggers finishedSet, ExecutableTrigger trigger) {
- finishedSet.setFinished(trigger, true);
- for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
- setFinishedRecursively(finishedSet, subTrigger);
- }
- }
-
- private static void verifyFinishedRecursively(
- FinishedTriggers finishedSet, ExecutableTrigger trigger) {
- assertTrue(finishedSet.isFinished(trigger));
- for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
- verifyFinishedRecursively(finishedSet, subTrigger);
- }
- }
-
- private static void verifyUnfinishedRecursively(
- FinishedTriggers finishedSet, ExecutableTrigger trigger) {
- assertFalse(finishedSet.isFinished(trigger));
- for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
- verifyUnfinishedRecursively(finishedSet, subTrigger);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersSetTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersSetTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersSetTest.java
deleted file mode 100644
index 072d264..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersSetTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.theInstance;
-import static org.junit.Assert.assertThat;
-
-import java.util.HashSet;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link FinishedTriggersSet}.
- */
-@RunWith(JUnit4.class)
-public class FinishedTriggersSetTest {
- /**
- * Tests that after a trigger is set to finished, it reads back as finished.
- */
- @Test
- public void testSetGet() {
- FinishedTriggersProperties.verifyGetAfterSet(
- FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>()));
- }
-
- /**
- * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no
- * others.
- */
- @Test
- public void testClearRecursively() {
- FinishedTriggersProperties.verifyClearRecursively(
- FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>()));
- }
-
- @Test
- public void testCopy() throws Exception {
- FinishedTriggersSet finishedSet =
- FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>());
- assertThat(finishedSet.copy().getFinishedTriggers(),
- not(theInstance(finishedSet.getFinishedTriggers())));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java
index 83077f4..63c71ed 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java
@@ -18,15 +18,9 @@
package org.apache.beam.sdk.util;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -43,23 +37,6 @@ public class ReshuffleTriggerTest {
}
@Test
- public void testShouldFire() throws Exception {
- TriggerTester<Integer, IntervalWindow> tester = TriggerTester.forTrigger(
- new ReshuffleTrigger<IntervalWindow>(), FixedWindows.of(Duration.millis(100)));
- IntervalWindow arbitraryWindow = new IntervalWindow(new Instant(300), new Instant(400));
- assertTrue(tester.shouldFire(arbitraryWindow));
- }
-
- @Test
- public void testOnTimer() throws Exception {
- TriggerTester<Integer, IntervalWindow> tester = TriggerTester.forTrigger(
- new ReshuffleTrigger<IntervalWindow>(), FixedWindows.of(Duration.millis(100)));
- IntervalWindow arbitraryWindow = new IntervalWindow(new Instant(100), new Instant(200));
- tester.fireIfShouldFire(arbitraryWindow);
- assertFalse(tester.isMarkedFinished(arbitraryWindow));
- }
-
- @Test
public void testToString() {
Trigger trigger = new ReshuffleTrigger<>();
assertEquals("ReshuffleTrigger()", trigger.toString());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
deleted file mode 100644
index 5fe17ad..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.ActiveWindowSet.MergeCallback;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-import org.apache.beam.sdk.util.state.StateNamespaces.WindowAndTriggerNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
-import org.apache.beam.sdk.util.state.TestInMemoryStateInternals;
-import org.apache.beam.sdk.util.state.TimerCallback;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * Test utility that runs a {@link Trigger}, using in-memory stub implementation to provide
- * the {@link StateInternals}.
- *
- * @param <W> The type of windows being used.
- */
-public class TriggerTester<InputT, W extends BoundedWindow> {
-
- /**
- * A {@link TriggerTester} specialized to {@link Integer} values, so elements and timestamps
- * can be conflated. Today, triggers should not observed the element type, so this is the
- * only trigger tester that needs to be used.
- */
- public static class SimpleTriggerTester<W extends BoundedWindow>
- extends TriggerTester<Integer, W> {
-
- private SimpleTriggerTester(WindowingStrategy<Object, W> windowingStrategy) throws Exception {
- super(windowingStrategy);
- }
-
- public void injectElements(int... values) throws Exception {
- List<TimestampedValue<Integer>> timestampedValues =
- Lists.newArrayListWithCapacity(values.length);
- for (int value : values) {
- timestampedValues.add(TimestampedValue.of(value, new Instant(value)));
- }
- injectElements(timestampedValues);
- }
-
- public SimpleTriggerTester<W> withAllowedLateness(Duration allowedLateness) throws Exception {
- return new SimpleTriggerTester<>(
- windowingStrategy.withAllowedLateness(allowedLateness));
- }
- }
-
- protected final WindowingStrategy<Object, W> windowingStrategy;
-
- private final TestInMemoryStateInternals<?> stateInternals =
- new TestInMemoryStateInternals<Object>(null /* key */);
- private final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
- private final TriggerContextFactory<W> contextFactory;
- private final WindowFn<Object, W> windowFn;
- private final ActiveWindowSet<W> activeWindows;
- private final Map<W, W> windowToMergeResult;
-
- /**
- * An {@link ExecutableTrigger} built from the {@link Trigger} or {@link Trigger}
- * under test.
- */
- private final ExecutableTrigger executableTrigger;
-
- /**
- * A map from a window and trigger to whether that trigger is finished for the window.
- */
- private final Map<W, FinishedTriggers> finishedSets;
-
- public static <W extends BoundedWindow> SimpleTriggerTester<W> forTrigger(
- Trigger trigger, WindowFn<Object, W> windowFn)
- throws Exception {
- WindowingStrategy<Object, W> windowingStrategy =
- WindowingStrategy.of(windowFn).withTrigger(trigger)
- // Merging requires accumulation mode or early firings can break up a session.
- // Not currently an issue with the tester (because we never GC) but we don't want
- // mystery failures due to violating this need.
- .withMode(windowFn.isNonMerging()
- ? AccumulationMode.DISCARDING_FIRED_PANES
- : AccumulationMode.ACCUMULATING_FIRED_PANES);
-
- return new SimpleTriggerTester<>(windowingStrategy);
- }
-
- public static <InputT, W extends BoundedWindow> TriggerTester<InputT, W> forAdvancedTrigger(
- Trigger trigger, WindowFn<Object, W> windowFn) throws Exception {
- WindowingStrategy<Object, W> strategy =
- WindowingStrategy.of(windowFn).withTrigger(trigger)
- // Merging requires accumulation mode or early firings can break up a session.
- // Not currently an issue with the tester (because we never GC) but we don't want
- // mystery failures due to violating this need.
- .withMode(windowFn.isNonMerging()
- ? AccumulationMode.DISCARDING_FIRED_PANES
- : AccumulationMode.ACCUMULATING_FIRED_PANES);
-
- return new TriggerTester<>(strategy);
- }
-
- protected TriggerTester(WindowingStrategy<Object, W> windowingStrategy) throws Exception {
- this.windowingStrategy = windowingStrategy;
- this.windowFn = windowingStrategy.getWindowFn();
- this.executableTrigger = windowingStrategy.getTrigger();
- this.finishedSets = new HashMap<>();
-
- this.activeWindows =
- windowFn.isNonMerging()
- ? new NonMergingActiveWindowSet<W>()
- : new MergingActiveWindowSet<W>(windowFn, stateInternals);
- this.windowToMergeResult = new HashMap<>();
-
- this.contextFactory =
- new TriggerContextFactory<>(windowingStrategy.getWindowFn(), stateInternals, activeWindows);
- }
-
- /**
- * Instructs the trigger to clear its state for the given window.
- */
- public void clearState(W window) throws Exception {
- executableTrigger.invokeClear(contextFactory.base(window,
- new TestTimers(windowNamespace(window)), executableTrigger, getFinishedSet(window)));
- }
-
- /**
- * Asserts that the trigger has actually cleared all of its state for the given window. Since
- * the trigger under test is the root, this makes the assert for all triggers regardless
- * of their position in the trigger tree.
- */
- public void assertCleared(W window) {
- for (StateNamespace untypedNamespace : stateInternals.getNamespacesInUse()) {
- if (untypedNamespace instanceof WindowAndTriggerNamespace) {
- @SuppressWarnings("unchecked")
- WindowAndTriggerNamespace<W> namespace = (WindowAndTriggerNamespace<W>) untypedNamespace;
- if (namespace.getWindow().equals(window)) {
- Set<?> tagsInUse = stateInternals.getTagsInUse(namespace);
- assertTrue("Trigger has not cleared tags: " + tagsInUse, tagsInUse.isEmpty());
- }
- }
- }
- }
-
- /**
- * Returns {@code true} if the {@link Trigger} under test is finished for the given window.
- */
- public boolean isMarkedFinished(W window) {
- FinishedTriggers finishedSet = finishedSets.get(window);
- if (finishedSet == null) {
- return false;
- }
-
- return finishedSet.isFinished(executableTrigger);
- }
-
- private StateNamespace windowNamespace(W window) {
- return StateNamespaces.window(windowFn.windowCoder(), checkNotNull(window));
- }
-
- /**
- * Advance the input watermark to the specified time, then advance the output watermark as far as
- * possible.
- */
- public void advanceInputWatermark(Instant newInputWatermark) throws Exception {
- // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694
- timerInternals.advanceInputWatermark(TimerCallback.NO_OP, newInputWatermark);
- }
-
- /** Advance the processing time to the specified time. */
- public void advanceProcessingTime(Instant newProcessingTime) throws Exception {
- // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694
- timerInternals.advanceProcessingTime(TimerCallback.NO_OP, newProcessingTime);
- }
-
- /**
- * Inject all the timestamped values (after passing through the window function) as if they
- * arrived in a single chunk of a bundle (or work-unit).
- */
- @SafeVarargs
- public final void injectElements(TimestampedValue<InputT>... values) throws Exception {
- injectElements(Arrays.asList(values));
- }
-
- public final void injectElements(Collection<TimestampedValue<InputT>> values) throws Exception {
- for (TimestampedValue<InputT> value : values) {
- WindowTracing.trace("TriggerTester.injectElements: {}", value);
- }
-
- List<WindowedValue<InputT>> windowedValues = Lists.newArrayListWithCapacity(values.size());
-
- for (TimestampedValue<InputT> input : values) {
- try {
- InputT value = input.getValue();
- Instant timestamp = input.getTimestamp();
- Collection<W> assignedWindows = windowFn.assignWindows(new TestAssignContext<W>(
- windowFn, value, timestamp, GlobalWindow.INSTANCE));
-
- for (W window : assignedWindows) {
- activeWindows.addActiveForTesting(window);
-
- // Today, triggers assume onTimer firing at the watermark time, whether or not they
- // explicitly set the timer themselves. So this tester must set it.
- timerInternals.setTimer(
- TimerData.of(windowNamespace(window), window.maxTimestamp(), TimeDomain.EVENT_TIME));
- }
-
- windowedValues.add(WindowedValue.of(value, timestamp, assignedWindows, PaneInfo.NO_FIRING));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- for (WindowedValue<InputT> windowedValue : windowedValues) {
- for (BoundedWindow untypedWindow : windowedValue.getWindows()) {
- // SDK is responsible for type safety
- @SuppressWarnings("unchecked")
- W window = mergeResult((W) untypedWindow);
-
- Trigger.OnElementContext context = contextFactory.createOnElementContext(window,
- new TestTimers(windowNamespace(window)), windowedValue.getTimestamp(),
- executableTrigger, getFinishedSet(window));
-
- if (!context.trigger().isFinished()) {
- executableTrigger.invokeOnElement(context);
- }
- }
- }
- }
-
- public boolean shouldFire(W window) throws Exception {
- Trigger.TriggerContext context = contextFactory.base(
- window,
- new TestTimers(windowNamespace(window)),
- executableTrigger, getFinishedSet(window));
- executableTrigger.getSpec().prefetchShouldFire(context.state());
- return executableTrigger.invokeShouldFire(context);
- }
-
- public void fireIfShouldFire(W window) throws Exception {
- Trigger.TriggerContext context = contextFactory.base(
- window,
- new TestTimers(windowNamespace(window)),
- executableTrigger, getFinishedSet(window));
-
- executableTrigger.getSpec().prefetchShouldFire(context.state());
- if (executableTrigger.invokeShouldFire(context)) {
- executableTrigger.getSpec().prefetchOnFire(context.state());
- executableTrigger.invokeOnFire(context);
- if (context.trigger().isFinished()) {
- activeWindows.remove(window);
- executableTrigger.invokeClear(context);
- }
- }
- }
-
- public void setSubTriggerFinishedForWindow(int subTriggerIndex, W window, boolean value) {
- getFinishedSet(window).setFinished(executableTrigger.subTriggers().get(subTriggerIndex), value);
- }
-
- /**
- * Invokes merge from the {@link WindowFn} a single time and passes the resulting merge
- * events on to the trigger under test. Does not persist the fact that merging happened,
- * since it is just to test the trigger's {@code OnMerge} method.
- */
- public final void mergeWindows() throws Exception {
- windowToMergeResult.clear();
- activeWindows.merge(new MergeCallback<W>() {
- @Override
- public void prefetchOnMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {}
-
- @Override
- public void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {
- List<W> activeToBeMerged = new ArrayList<W>();
- for (W window : toBeMerged) {
- windowToMergeResult.put(window, mergeResult);
- if (activeWindows.isActive(window)) {
- activeToBeMerged.add(window);
- }
- }
- Map<W, FinishedTriggers> mergingFinishedSets =
- Maps.newHashMapWithExpectedSize(activeToBeMerged.size());
- for (W oldWindow : activeToBeMerged) {
- mergingFinishedSets.put(oldWindow, getFinishedSet(oldWindow));
- }
- executableTrigger.invokeOnMerge(contextFactory.createOnMergeContext(mergeResult,
- new TestTimers(windowNamespace(mergeResult)), executableTrigger,
- getFinishedSet(mergeResult), mergingFinishedSets));
- timerInternals.setTimer(TimerData.of(
- windowNamespace(mergeResult), mergeResult.maxTimestamp(), TimeDomain.EVENT_TIME));
- }
- });
- }
-
- public W mergeResult(W window) {
- W result = windowToMergeResult.get(window);
- return result == null ? window : result;
- }
-
- private FinishedTriggers getFinishedSet(W window) {
- FinishedTriggers finishedSet = finishedSets.get(window);
- if (finishedSet == null) {
- finishedSet = FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>());
- finishedSets.put(window, finishedSet);
- }
- return finishedSet;
- }
-
- private static class TestAssignContext<W extends BoundedWindow>
- extends WindowFn<Object, W>.AssignContext {
- private Object element;
- private Instant timestamp;
- private BoundedWindow window;
-
- public TestAssignContext(
- WindowFn<Object, W> windowFn, Object element, Instant timestamp, BoundedWindow window) {
- windowFn.super();
- this.element = element;
- this.timestamp = timestamp;
- this.window = window;
- }
-
- @Override
- public Object element() {
- return element;
- }
-
- @Override
- public Instant timestamp() {
- return timestamp;
- }
-
- @Override
- public BoundedWindow window() {
- return window;
- }
- }
-
- private class TestTimers implements Timers {
- private final StateNamespace namespace;
-
- public TestTimers(StateNamespace namespace) {
- checkArgument(namespace instanceof WindowNamespace);
- this.namespace = namespace;
- }
-
- @Override
- public void setTimer(Instant timestamp, TimeDomain timeDomain) {
- timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain));
- }
-
- @Override
- public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
- timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain));
- }
-
- @Override
- public Instant currentProcessingTime() {
- return timerInternals.currentProcessingTime();
- }
-
- @Override
- @Nullable
- public Instant currentSynchronizedProcessingTime() {
- return timerInternals.currentSynchronizedProcessingTime();
- }
-
- @Override
- public Instant currentEventTime() {
- return timerInternals.currentInputWatermarkTime();
- }
- }
-}