You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/20 19:51:56 UTC
[1/4] incubator-beam git commit: Translate ReshuffleTrigger to
ReshuffleTriggerStateMachine
Repository: incubator-beam
Updated Branches:
refs/heads/master 472cf0ec0 -> a69f888e8
Translate ReshuffleTrigger to ReshuffleTriggerStateMachine
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/be07065f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/be07065f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/be07065f
Branch: refs/heads/master
Commit: be07065fe6ae404cc36d21c95ff5025e0ed4233c
Parents: 33d9baa
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 20 10:14:08 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 20 12:41:04 2016 -0700
----------------------------------------------------------------------
.../apache/beam/runners/core/triggers/TriggerStateMachines.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/be07065f/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
index 317e3b9..f19f3cf 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.ReshuffleTrigger;
import org.apache.beam.sdk.util.TimeDomain;
import org.joda.time.Instant;
@@ -100,6 +101,10 @@ public class TriggerStateMachines {
return DefaultTriggerStateMachine.of();
}
+ private TriggerStateMachine evaluateSpecific(ReshuffleTrigger v) {
+ return new ReshuffleTriggerStateMachine();
+ }
+
private OnceTriggerStateMachine evaluateSpecific(AfterWatermark.FromEndOfWindow v) {
return AfterWatermarkStateMachine.pastEndOfWindow();
}
[2/4] incubator-beam git commit: Delete TriggerRunner
Posted by ke...@apache.org.
Delete TriggerRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/33d9baaf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/33d9baaf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/33d9baaf
Branch: refs/heads/master
Commit: 33d9baaf5778c565632f6fe98344b8f1bd8a1d75
Parents: 3237440
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Oct 19 17:41:39 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 20 12:41:04 2016 -0700
----------------------------------------------------------------------
.../apache/beam/runners/core/TriggerRunner.java | 247 -------------------
1 file changed, 247 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33d9baaf/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java
deleted file mode 100644
index 8d0f322..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Map;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.util.BitSetCoder;
-import org.apache.beam.sdk.util.ExecutableTrigger;
-import org.apache.beam.sdk.util.FinishedTriggers;
-import org.apache.beam.sdk.util.FinishedTriggersBitSet;
-import org.apache.beam.sdk.util.Timers;
-import org.apache.beam.sdk.util.TriggerContextFactory;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.joda.time.Instant;
-
-/**
- * Executes a trigger while managing persistence of information about which subtriggers are
- * finished. Subtriggers include all recursive trigger expressions as well as the entire trigger.
- *
- * <p>Specifically, the responsibilities are:
- *
- * <ul>
- * <li>Invoking the trigger's methods via its {@link ExecutableTrigger} wrapper by
- * constructing the appropriate trigger contexts.</li>
- * <li>Committing a record of which subtriggers are finished to persistent state.</li>
- * <li>Restoring the record of which subtriggers are finished from persistent state.</li>
- * <li>Clearing out the persisted finished set when a caller indicates
- * (via {#link #clearFinished}) that it is no longer needed.</li>
- * </ul>
- *
- * <p>These responsibilities are intertwined: trigger contexts include mutable information about
- * which subtriggers are finished. This class provides the information when building the contexts
- * and commits the information when the method of the {@link ExecutableTrigger} returns.
- *
- * @param <W> The kind of windows being processed.
- */
-public class TriggerRunner<W extends BoundedWindow> {
- @VisibleForTesting
- static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG =
- StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of()));
-
- private final ExecutableTrigger rootTrigger;
- private final TriggerContextFactory<W> contextFactory;
-
- public TriggerRunner(ExecutableTrigger rootTrigger, TriggerContextFactory<W> contextFactory) {
- checkState(rootTrigger.getTriggerIndex() == 0);
- this.rootTrigger = rootTrigger;
- this.contextFactory = contextFactory;
- }
-
- private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) {
- if (!isFinishedSetNeeded()) {
- // If no trigger in the tree will ever have finished bits, then we don't need to read them.
- // So that the code can be agnostic to that fact, we create a BitSet that is all 0 (not
- // finished) for each trigger in the tree.
- return FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree());
- }
-
- BitSet bitSet = state.read();
- return bitSet == null
- ? FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree())
- : FinishedTriggersBitSet.fromBitSet(bitSet);
- }
-
-
- private void clearFinishedBits(ValueState<BitSet> state) {
- if (!isFinishedSetNeeded()) {
- // Nothing to clear.
- return;
- }
- state.clear();
- }
-
- /** Return true if the trigger is closed in the window corresponding to the specified state. */
- public boolean isClosed(StateAccessor<?> state) {
- return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
- }
-
- @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
- justification = "prefetch side effect")
- public void prefetchForValue(W window, StateAccessor<?> state) {
- if (isFinishedSetNeeded()) {
- state.access(FINISHED_BITS_TAG).readLater();
- }
- rootTrigger.getSpec().prefetchOnElement(
- contextFactory.createStateAccessor(window, rootTrigger));
- }
-
- @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
- justification = "prefetch side effect")
- public void prefetchOnFire(W window, StateAccessor<?> state) {
- if (isFinishedSetNeeded()) {
- state.access(FINISHED_BITS_TAG).readLater();
- }
- rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger));
- }
-
- @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
- justification = "prefetch side effect")
- public void prefetchShouldFire(W window, StateAccessor<?> state) {
- if (isFinishedSetNeeded()) {
- state.access(FINISHED_BITS_TAG).readLater();
- }
- rootTrigger.getSpec().prefetchShouldFire(
- contextFactory.createStateAccessor(window, rootTrigger));
- }
-
- /**
- * Run the trigger logic to deal with a new value.
- */
- public void processValue(W window, Instant timestamp, Timers timers, StateAccessor<?> state)
- throws Exception {
- // Clone so that we can detect changes and so that changes here don't pollute merging.
- FinishedTriggersBitSet finishedSet =
- readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
- Trigger.OnElementContext triggerContext = contextFactory.createOnElementContext(
- window, timers, timestamp, rootTrigger, finishedSet);
- rootTrigger.invokeOnElement(triggerContext);
- persistFinishedSet(state, finishedSet);
- }
-
- @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
- justification = "prefetch side effect")
- public void prefetchForMerge(
- W window, Collection<W> mergingWindows, MergingStateAccessor<?, W> state) {
- if (isFinishedSetNeeded()) {
- for (ValueState<?> value : state.accessInEachMergingWindow(FINISHED_BITS_TAG).values()) {
- value.readLater();
- }
- }
- rootTrigger.getSpec().prefetchOnMerge(contextFactory.createMergingStateAccessor(
- window, mergingWindows, rootTrigger));
- }
-
- /**
- * Run the trigger merging logic as part of executing the specified merge.
- */
- public void onMerge(W window, Timers timers, MergingStateAccessor<?, W> state) throws Exception {
- // Clone so that we can detect changes and so that changes here don't pollute merging.
- FinishedTriggersBitSet finishedSet =
- readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-
- // And read the finished bits in each merging window.
- ImmutableMap.Builder<W, FinishedTriggers> builder = ImmutableMap.builder();
- for (Map.Entry<W, ValueState<BitSet>> entry :
- state.accessInEachMergingWindow(FINISHED_BITS_TAG).entrySet()) {
- // Don't need to clone these, since the trigger context doesn't allow modification
- builder.put(entry.getKey(), readFinishedBits(entry.getValue()));
- // Clear the underlying finished bits.
- clearFinishedBits(entry.getValue());
- }
- ImmutableMap<W, FinishedTriggers> mergingFinishedSets = builder.build();
-
- Trigger.OnMergeContext mergeContext = contextFactory.createOnMergeContext(
- window, timers, rootTrigger, finishedSet, mergingFinishedSets);
-
- // Run the merge from the trigger
- rootTrigger.invokeOnMerge(mergeContext);
-
- persistFinishedSet(state, finishedSet);
- }
-
- public boolean shouldFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
- FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
- Trigger.TriggerContext context = contextFactory.base(window, timers,
- rootTrigger, finishedSet);
- return rootTrigger.invokeShouldFire(context);
- }
-
- public void onFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
- // shouldFire should be false.
- // However it is too expensive to assert.
- FinishedTriggersBitSet finishedSet =
- readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
- Trigger.TriggerContext context = contextFactory.base(window, timers,
- rootTrigger, finishedSet);
- rootTrigger.invokeOnFire(context);
- persistFinishedSet(state, finishedSet);
- }
-
- private void persistFinishedSet(
- StateAccessor<?> state, FinishedTriggersBitSet modifiedFinishedSet) {
- if (!isFinishedSetNeeded()) {
- return;
- }
-
- ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG);
- if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
- if (modifiedFinishedSet.getBitSet().isEmpty()) {
- finishedSetState.clear();
- } else {
- finishedSetState.write(modifiedFinishedSet.getBitSet());
- }
- }
- }
-
- /**
- * Clear the finished bits.
- */
- public void clearFinished(StateAccessor<?> state) {
- clearFinishedBits(state.access(FINISHED_BITS_TAG));
- }
-
- /**
- * Clear the state used for executing triggers, but leave the finished set to indicate
- * the window is closed.
- */
- public void clearState(W window, Timers timers, StateAccessor<?> state) throws Exception {
- // Don't need to clone, because we'll be clearing the finished bits anyways.
- FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG));
- rootTrigger.invokeClear(contextFactory.base(window, timers, rootTrigger, finishedSet));
- }
-
- private boolean isFinishedSetNeeded() {
- // TODO: If we know that no trigger in the tree will ever finish, we don't need to do the
- // lookup. Right now, we special case this for the DefaultTrigger.
- return !(rootTrigger.getSpec() instanceof DefaultTrigger);
- }
-}
[4/4] incubator-beam git commit: This closes #1138
Posted by ke...@apache.org.
This closes #1138
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a69f888e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a69f888e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a69f888e
Branch: refs/heads/master
Commit: a69f888e830c8998ca8e29eada054f7b938918d6
Parents: 472cf0e be07065
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 20 12:43:11 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 20 12:43:11 2016 -0700
----------------------------------------------------------------------
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 5 +
.../GroupAlsoByWindowsViaOutputBufferDoFn.java | 5 +
.../beam/runners/core/ReduceFnRunner.java | 37 +--
.../apache/beam/runners/core/TriggerRunner.java | 247 ----------------
.../triggers/TriggerStateMachineRunner.java | 2 +-
.../core/triggers/TriggerStateMachines.java | 5 +
.../beam/runners/core/ReduceFnRunnerTest.java | 281 +++++++++++--------
.../beam/runners/core/ReduceFnTester.java | 157 ++++++++---
8 files changed, 318 insertions(+), 421 deletions(-)
----------------------------------------------------------------------
[3/4] incubator-beam git commit: Port ReduceFnRunner to
TriggerStateMachine
Posted by ke...@apache.org.
Port ReduceFnRunner to TriggerStateMachine
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3237440e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3237440e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3237440e
Branch: refs/heads/master
Commit: 3237440e2120555a90d74dfaae1d7b44b2d17203
Parents: 472cf0e
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Oct 19 16:48:58 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 20 12:41:04 2016 -0700
----------------------------------------------------------------------
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 5 +
.../GroupAlsoByWindowsViaOutputBufferDoFn.java | 5 +
.../beam/runners/core/ReduceFnRunner.java | 37 +--
.../triggers/TriggerStateMachineRunner.java | 2 +-
.../beam/runners/core/ReduceFnRunnerTest.java | 281 +++++++++++--------
.../beam/runners/core/ReduceFnTester.java | 157 ++++++++---
6 files changed, 313 insertions(+), 174 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3237440e/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index b427037..75a5aa7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -18,6 +18,8 @@
package org.apache.beam.runners.core;
import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.Sum;
@@ -81,6 +83,9 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
new ReduceFnRunner<>(
key,
windowingStrategy,
+ ExecutableTriggerStateMachine.create(
+ TriggerStateMachines.stateMachineForTrigger(
+ windowingStrategy.getTrigger().getSpec())),
stateInternals,
timerInternals,
c.windowingInternals(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3237440e/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
index 23986df..4dea775 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -19,6 +19,8 @@ package org.apache.beam.runners.core;
import com.google.common.collect.Iterables;
import java.util.List;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SystemDoFnInternal;
@@ -70,6 +72,9 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends
new ReduceFnRunner<K, InputT, OutputT, W>(
key,
strategy,
+ ExecutableTriggerStateMachine.create(
+ TriggerStateMachines.stateMachineForTrigger(
+ strategy.getTrigger().getSpec())),
stateInternals,
timerInternals,
c.windowingInternals(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3237440e/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 24d472b..78c4e0b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -34,6 +34,9 @@ import javax.annotation.Nullable;
import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
import org.apache.beam.runners.core.ReduceFnContextFactory.OnTriggerCallbacks;
import org.apache.beam.runners.core.ReduceFnContextFactory.StateStyle;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory;
+import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
@@ -50,7 +53,6 @@ import org.apache.beam.sdk.util.NonMergingActiveWindowSet;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.TriggerContextFactory;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingInternals;
@@ -69,25 +71,25 @@ import org.joda.time.Instant;
* Manages the execution of a {@link ReduceFn} after a {@link GroupByKeyOnly} has partitioned the
* {@link PCollection} by key.
*
- * <p>The {@link #onTrigger} relies on a {@link TriggerRunner} to manage the execution of
- * the triggering logic. The {@code ReduceFnRunner}s responsibilities are:
+ * <p>The {@link #onTrigger} relies on a {@link TriggerStateMachineRunner} to manage the execution
+ * of the triggering logic. The {@code ReduceFnRunner}s responsibilities are:
*
* <ul>
- * <li>Tracking the windows that are active (have buffered data) as elements arrive and
- * triggers are fired.
- * <li>Holding the watermark based on the timestamps of elements in a pane and releasing it
- * when the trigger fires.
+ * <li>Tracking the windows that are active (have buffered data) as elements arrive and triggers are
+ * fired.
+ * <li>Holding the watermark based on the timestamps of elements in a pane and releasing it when the
+ * trigger fires.
* <li>Calling the appropriate callbacks on {@link ReduceFn} based on trigger execution, timer
- * firings, etc, and providing appropriate contexts to the {@link ReduceFn} for actions
- * such as output.
+ * firings, etc, and providing appropriate contexts to the {@link ReduceFn} for actions such as
+ * output.
* <li>Scheduling garbage collection of state associated with a specific window, and making that
- * happen when the appropriate timer fires.
+ * happen when the appropriate timer fires.
* </ul>
*
- * @param <K> The type of key being processed.
- * @param <InputT> The type of values associated with the key.
+ * @param <K> The type of key being processed.
+ * @param <InputT> The type of values associated with the key.
* @param <OutputT> The output type that will be produced for each key.
- * @param <W> The type of windows this operates on.
+ * @param <W> The type of windows this operates on.
*/
public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> implements TimerCallback {
@@ -165,7 +167,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
* garbage collected.
* </ul>
*/
- private final TriggerRunner<W> triggerRunner;
+ private final TriggerStateMachineRunner<W> triggerRunner;
/**
* Store the output watermark holds for each window.
@@ -212,6 +214,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
public ReduceFnRunner(
K key,
WindowingStrategy<?, W> windowingStrategy,
+ ExecutableTriggerStateMachine triggerStateMachine,
StateInternals<K> stateInternals,
TimerInternals timerInternals,
WindowingInternals<?, KV<K, OutputT>> windowingInternals,
@@ -242,9 +245,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
this.watermarkHold = new WatermarkHold<>(timerInternals, windowingStrategy);
this.triggerRunner =
- new TriggerRunner<>(
- windowingStrategy.getTrigger(),
- new TriggerContextFactory<>(
+ new TriggerStateMachineRunner<>(
+ triggerStateMachine,
+ new TriggerStateMachineContextFactory<>(
windowingStrategy.getWindowFn(), stateInternals, activeWindows));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3237440e/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
index 0ffbbca..9f03216 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
@@ -57,7 +57,7 @@ import org.joda.time.Instant;
*/
public class TriggerStateMachineRunner<W extends BoundedWindow> {
@VisibleForTesting
- static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG =
+ public static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG =
StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of()));
private final ExecutableTriggerStateMachine rootTrigger;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3237440e/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 4d5680c..20eb08b 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -38,6 +38,7 @@ import static org.mockito.Mockito.withSettings;
import com.google.common.collect.Iterables;
import java.util.Iterator;
import java.util.List;
+import org.apache.beam.runners.core.triggers.TriggerStateMachine;
import org.apache.beam.sdk.WindowMatchers;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -94,23 +95,23 @@ import org.mockito.stubbing.Answer;
@RunWith(JUnit4.class)
public class ReduceFnRunnerTest {
@Mock private SideInputReader mockSideInputReader;
- private Trigger mockTrigger;
+ private TriggerStateMachine mockTriggerStateMachine;
private PCollectionView<Integer> mockView;
private IntervalWindow firstWindow;
- private static Trigger.TriggerContext anyTriggerContext() {
- return Mockito.<Trigger.TriggerContext>any();
+ private static TriggerStateMachine.TriggerContext anyTriggerContext() {
+ return Mockito.<TriggerStateMachine.TriggerContext>any();
}
- private static Trigger.OnElementContext anyElementContext() {
- return Mockito.<Trigger.OnElementContext>any();
+ private static TriggerStateMachine.OnElementContext anyElementContext() {
+ return Mockito.<TriggerStateMachine.OnElementContext>any();
}
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
- mockTrigger = mock(Trigger.class, withSettings().serializable());
+ mockTriggerStateMachine = mock(TriggerStateMachine.class, withSettings().serializable());
@SuppressWarnings("unchecked")
PCollectionView<Integer> mockViewUnchecked =
@@ -121,17 +122,17 @@ public class ReduceFnRunnerTest {
private void injectElement(ReduceFnTester<Integer, ?, IntervalWindow> tester, int element)
throws Exception {
- doNothing().when(mockTrigger).onElement(anyElementContext());
+ doNothing().when(mockTriggerStateMachine).onElement(anyElementContext());
tester.injectElements(TimestampedValue.of(element, new Instant(element)));
}
- private void triggerShouldFinish(Trigger mockTrigger) throws Exception {
+ private void triggerShouldFinish(TriggerStateMachine mockTrigger) throws Exception {
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Exception {
@SuppressWarnings("unchecked")
- Trigger.TriggerContext context =
- (Trigger.TriggerContext) invocation.getArguments()[0];
+ TriggerStateMachine.TriggerContext context =
+ (TriggerStateMachine.TriggerContext) invocation.getArguments()[0];
context.trigger().setFinished(true);
return null;
}
@@ -143,20 +144,20 @@ public class ReduceFnRunnerTest {
public void testOnElementBufferingDiscarding() throws Exception {
// Test basic execution of a trigger using a non-combining window set and discarding mode.
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+ ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine,
AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
ClosingBehavior.FIRE_IF_NON_EMPTY);
// Pane of {1, 2}
injectElement(tester, 1);
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
injectElement(tester, 2);
assertThat(tester.extractOutput(),
contains(isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10)));
// Pane of just 3, and finish
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+ triggerShouldFinish(mockTriggerStateMachine);
injectElement(tester, 3);
assertThat(tester.extractOutput(),
contains(isSingleWindowedValue(containsInAnyOrder(3), 3, 0, 10)));
@@ -173,19 +174,22 @@ public class ReduceFnRunnerTest {
public void testOnElementBufferingAccumulating() throws Exception {
// Test basic execution of a trigger using a non-combining window set and accumulating mode.
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
- AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(100),
+ ReduceFnTester.nonCombining(
+ FixedWindows.of(Duration.millis(10)),
+ mockTriggerStateMachine,
+ AccumulationMode.ACCUMULATING_FIRED_PANES,
+ Duration.millis(100),
ClosingBehavior.FIRE_IF_NON_EMPTY);
injectElement(tester, 1);
// Fires {1, 2}
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
injectElement(tester, 2);
// Fires {1, 2, 3} because we are in accumulating mode
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+ triggerShouldFinish(mockTriggerStateMachine);
injectElement(tester, 3);
// This element shouldn't be seen, because the trigger has finished
@@ -203,17 +207,27 @@ public class ReduceFnRunnerTest {
@Test
public void testOnElementCombiningDiscarding() throws Exception {
// Test basic execution of a trigger using a non-combining window set and discarding mode.
- ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining(
- FixedWindows.of(Duration.millis(10)), mockTrigger, AccumulationMode.DISCARDING_FIRED_PANES,
- new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of(), Duration.millis(100));
+
+ WindowingStrategy<?, IntervalWindow> strategy =
+ WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
+ .withAllowedLateness(Duration.millis(100));
+
+ ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+ ReduceFnTester.combining(
+ strategy,
+ mockTriggerStateMachine,
+ new Sum.SumIntegerFn().<String>asKeyedFn(),
+ VarIntCoder.of());
injectElement(tester, 2);
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
injectElement(tester, 3);
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+ triggerShouldFinish(mockTriggerStateMachine);
injectElement(tester, 4);
// This element shouldn't be seen, because the trigger has finished
@@ -267,14 +281,17 @@ public class ReduceFnRunnerTest {
window.maxTimestamp().plus(allowedLateness).isAfter(GlobalWindow.INSTANCE.maxTimestamp()));
// Test basic execution of a trigger using a non-combining window set and accumulating mode.
+
+ WindowingStrategy<?, IntervalWindow> strategy =
+ WindowingStrategy.of((WindowFn<?, IntervalWindow>) windowFn)
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTrigger(AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever()))
+ .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
+ .withAllowedLateness(allowedLateness);
+
ReduceFnTester<Integer, Integer, IntervalWindow> tester =
- ReduceFnTester.combining(
- windowFn,
- AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever()),
- AccumulationMode.DISCARDING_FIRED_PANES,
- new Sum.SumIntegerFn().<String>asKeyedFn(),
- VarIntCoder.of(),
- allowedLateness);
+ ReduceFnTester
+ .combining(strategy, new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
tester.injectElements(TimestampedValue.of(13, elementTimestamp));
@@ -295,18 +312,27 @@ public class ReduceFnRunnerTest {
@Test
public void testOnElementCombiningAccumulating() throws Exception {
// Test basic execution of a trigger using a non-combining window set and accumulating mode.
+
+ WindowingStrategy<?, IntervalWindow> strategy =
+ WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+ .withAllowedLateness(Duration.millis(100));
+
ReduceFnTester<Integer, Integer, IntervalWindow> tester =
- ReduceFnTester.combining(FixedWindows.of(Duration.millis(10)), mockTrigger,
- AccumulationMode.ACCUMULATING_FIRED_PANES, new Sum.SumIntegerFn().<String>asKeyedFn(),
- VarIntCoder.of(), Duration.millis(100));
+ ReduceFnTester.combining(
+ strategy,
+ mockTriggerStateMachine,
+ new Sum.SumIntegerFn().<String>asKeyedFn(),
+ VarIntCoder.of());
injectElement(tester, 1);
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
injectElement(tester, 2);
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+ triggerShouldFinish(mockTriggerStateMachine);
injectElement(tester, 3);
// This element shouldn't be seen, because the trigger has finished
@@ -326,7 +352,6 @@ public class ReduceFnRunnerTest {
Integer expectedValue = 5;
WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy
.of(FixedWindows.of(Duration.millis(10)))
- .withTrigger(mockTrigger)
.withMode(AccumulationMode.DISCARDING_FIRED_PANES)
.withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
.withAllowedLateness(Duration.millis(100));
@@ -345,16 +370,16 @@ public class ReduceFnRunnerTest {
SumAndVerifyContextFn combineFn = new SumAndVerifyContextFn(mockView, expectedValue);
// Test basic execution of a trigger using a non-combining window set and discarding mode.
ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining(
- windowingStrategy, combineFn.<String>asKeyedFn(),
+ windowingStrategy, mockTriggerStateMachine, combineFn.<String>asKeyedFn(),
VarIntCoder.of(), options, mockSideInputReader);
injectElement(tester, 2);
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
injectElement(tester, 3);
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+ triggerShouldFinish(mockTriggerStateMachine);
injectElement(tester, 4);
// This element shouldn't be seen, because the trigger has finished
@@ -373,7 +398,7 @@ public class ReduceFnRunnerTest {
public void testWatermarkHoldAndLateData() throws Exception {
// Test handling of late data. Specifically, ensure the watermark hold is correct.
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+ ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine,
AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(10),
ClosingBehavior.FIRE_IF_NON_EMPTY);
@@ -385,7 +410,7 @@ public class ReduceFnRunnerTest {
injectElement(tester, 1);
injectElement(tester, 3);
assertEquals(new Instant(1), tester.getWatermarkHold());
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
injectElement(tester, 2);
List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
assertThat(output, contains(
@@ -406,14 +431,14 @@ public class ReduceFnRunnerTest {
assertEquals(new Instant(4), tester.getOutputWatermark());
// Some late, some on time. Verify that we only hold to the minimum of on-time.
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
tester.advanceInputWatermark(new Instant(4));
injectElement(tester, 2);
injectElement(tester, 3);
assertEquals(new Instant(9), tester.getWatermarkHold());
injectElement(tester, 5);
assertEquals(new Instant(5), tester.getWatermarkHold());
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
injectElement(tester, 4);
output = tester.extractOutput();
assertThat(output,
@@ -428,7 +453,7 @@ public class ReduceFnRunnerTest {
equalTo(PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)));
// All late -- output at end of window timestamp.
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
tester.advanceInputWatermark(new Instant(8));
injectElement(tester, 6);
injectElement(tester, 5);
@@ -436,7 +461,7 @@ public class ReduceFnRunnerTest {
injectElement(tester, 4);
// Fire the ON_TIME pane
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
tester.advanceInputWatermark(new Instant(10));
// Output time is end of the window, because all the new data was late, but the pane
@@ -455,7 +480,7 @@ public class ReduceFnRunnerTest {
// This is "pending" at the time the watermark makes it way-late.
// Because we're about to expire the window, we output it.
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
injectElement(tester, 8);
assertEquals(0, tester.getElementsDroppedDueToClosedWindow());
@@ -492,7 +517,7 @@ public class ReduceFnRunnerTest {
public void dontSetHoldIfTooLateForEndOfWindowTimer() throws Exception {
// Make sure holds are only set if they are accompanied by an end-of-window timer.
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+ ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine,
AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(10),
ClosingBehavior.FIRE_ALWAYS);
tester.setAutoAdvanceOutputWatermark(false);
@@ -506,9 +531,9 @@ public class ReduceFnRunnerTest {
assertEquals(new Instant(19), tester.getNextTimer(TimeDomain.EVENT_TIME));
// Trigger the end-of-window timer.
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
tester.advanceInputWatermark(new Instant(20));
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
// Hold has been replaced with garbage collection hold. Waiting for garbage collection.
assertEquals(new Instant(29), tester.getWatermarkHold());
assertEquals(new Instant(29), tester.getNextTimer(TimeDomain.EVENT_TIME));
@@ -530,37 +555,37 @@ public class ReduceFnRunnerTest {
@Test
public void testPaneInfoAllStates() throws Exception {
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+ ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine,
AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
ClosingBehavior.FIRE_IF_NON_EMPTY);
tester.advanceInputWatermark(new Instant(0));
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
injectElement(tester, 1);
assertThat(tester.extractOutput(), contains(
WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY))));
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
injectElement(tester, 2);
assertThat(tester.extractOutput(), contains(
WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.EARLY, 1, -1))));
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
tester.advanceInputWatermark(new Instant(15));
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
injectElement(tester, 3);
assertThat(tester.extractOutput(), contains(
WindowMatchers.valueWithPaneInfo(
PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0))));
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
injectElement(tester, 4);
assertThat(tester.extractOutput(), contains(
WindowMatchers.valueWithPaneInfo(
PaneInfo.createPane(false, false, Timing.LATE, 3, 1))));
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+ triggerShouldFinish(mockTriggerStateMachine);
injectElement(tester, 5);
assertThat(tester.extractOutput(), contains(
WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 4, 2))));
@@ -758,13 +783,13 @@ public class ReduceFnRunnerTest {
@Test
public void testPaneInfoSkipToFinish() throws Exception {
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+ ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine,
AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
ClosingBehavior.FIRE_IF_NON_EMPTY);
tester.advanceInputWatermark(new Instant(0));
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+ triggerShouldFinish(mockTriggerStateMachine);
injectElement(tester, 1);
assertThat(tester.extractOutput(), contains(
WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, true, Timing.EARLY))));
@@ -773,13 +798,13 @@ public class ReduceFnRunnerTest {
@Test
public void testPaneInfoSkipToNonSpeculativeAndFinish() throws Exception {
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+ ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine,
AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
ClosingBehavior.FIRE_IF_NON_EMPTY);
tester.advanceInputWatermark(new Instant(15));
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+ triggerShouldFinish(mockTriggerStateMachine);
injectElement(tester, 1);
assertThat(tester.extractOutput(), contains(
WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, true, Timing.LATE))));
@@ -790,7 +815,8 @@ public class ReduceFnRunnerTest {
// Verify that we merge windows before producing output so users don't see undesired
// unmerged windows.
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+ ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)),
+ mockTriggerStateMachine,
AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(0),
ClosingBehavior.FIRE_IF_NON_EMPTY);
@@ -823,7 +849,8 @@ public class ReduceFnRunnerTest {
@Test
public void testMergingWithCloseBeforeGC() throws Exception {
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+ ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)),
+ mockTriggerStateMachine,
AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
ClosingBehavior.FIRE_IF_NON_EMPTY);
@@ -833,8 +860,8 @@ public class ReduceFnRunnerTest {
TimestampedValue.of(10, new Instant(10))); // in [10, 20)
// Close the trigger, but the gargbage collection timer is still pending.
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+ triggerShouldFinish(mockTriggerStateMachine);
tester.advanceInputWatermark(new Instant(30));
// Now the garbage collection timer will fire, finding the trigger already closed.
@@ -858,7 +885,8 @@ public class ReduceFnRunnerTest {
@Test
public void testMergingWithCloseTrigger() throws Exception {
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+ ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)),
+ mockTriggerStateMachine,
AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
ClosingBehavior.FIRE_IF_NON_EMPTY);
@@ -867,14 +895,14 @@ public class ReduceFnRunnerTest {
TimestampedValue.of(2, new Instant(2)));
// Force the trigger to be closed for the merged window.
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+ triggerShouldFinish(mockTriggerStateMachine);
tester.advanceInputWatermark(new Instant(13));
// Trigger is now closed.
assertTrue(tester.isMarkedFinished(new IntervalWindow(new Instant(1), new Instant(12))));
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
// Revisit the same session window.
tester.injectElements(TimestampedValue.of(1, new Instant(1)),
@@ -891,7 +919,8 @@ public class ReduceFnRunnerTest {
@Test
public void testMergingWithReusedWindow() throws Exception {
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+ ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)),
+ mockTriggerStateMachine,
AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
ClosingBehavior.FIRE_IF_NON_EMPTY);
@@ -899,8 +928,8 @@ public class ReduceFnRunnerTest {
tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 11), gc at 21.
// Close the trigger, but the gargbage collection timer is still pending.
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+ triggerShouldFinish(mockTriggerStateMachine);
tester.advanceInputWatermark(new Instant(15));
// Another element in the same session window.
@@ -932,14 +961,15 @@ public class ReduceFnRunnerTest {
@Test
public void testMergingWithClosedRepresentative() throws Exception {
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+ ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)),
+ mockTriggerStateMachine,
AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
ClosingBehavior.FIRE_IF_NON_EMPTY);
// 2 elements into merged session window.
// Close the trigger, but the garbage collection timer is still pending.
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+ triggerShouldFinish(mockTriggerStateMachine);
tester.injectElements(TimestampedValue.of(1, new Instant(1)), // in [1, 11), gc at 21.
TimestampedValue.of(8, new Instant(8))); // in [8, 18), gc at 28.
@@ -973,17 +1003,18 @@ public class ReduceFnRunnerTest {
@Test
public void testMergingWithClosedDoesNotPoison() throws Exception {
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+ ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)),
+ mockTriggerStateMachine,
AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
ClosingBehavior.FIRE_IF_NON_EMPTY);
// 1 element, force its trigger to close.
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+ triggerShouldFinish(mockTriggerStateMachine);
tester.injectElements(TimestampedValue.of(2, new Instant(2)));
// 3 elements, one already closed.
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
tester.injectElements(TimestampedValue.of(1, new Instant(1)),
TimestampedValue.of(2, new Instant(2)),
TimestampedValue.of(3, new Instant(3)));
@@ -1052,7 +1083,7 @@ public class ReduceFnRunnerTest {
// Test uninteresting (empty) panes don't increment the index or otherwise
// modify PaneInfo.
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+ ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine,
AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
ClosingBehavior.FIRE_IF_NON_EMPTY);
@@ -1062,16 +1093,16 @@ public class ReduceFnRunnerTest {
tester.advanceInputWatermark(new Instant(12));
// Fire the on-time pane
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
tester.fireTimer(firstWindow, new Instant(9), TimeDomain.EVENT_TIME);
// Fire another timer (with no data, so it's an uninteresting pane that should not be output).
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
tester.fireTimer(firstWindow, new Instant(9), TimeDomain.EVENT_TIME);
// Finish it off with another datum.
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+ triggerShouldFinish(mockTriggerStateMachine);
injectElement(tester, 3);
// The intermediate trigger firing shouldn't result in any output.
@@ -1097,7 +1128,7 @@ public class ReduceFnRunnerTest {
// Test uninteresting (empty) panes don't increment the index or otherwise
// modify PaneInfo.
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
- ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+ ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine,
AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(100),
ClosingBehavior.FIRE_IF_NON_EMPTY);
@@ -1107,7 +1138,7 @@ public class ReduceFnRunnerTest {
tester.advanceInputWatermark(new Instant(12));
// Trigger the on-time pane
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
tester.fireTimer(firstWindow, new Instant(9), TimeDomain.EVENT_TIME);
List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
assertThat(output.size(), equalTo(1));
@@ -1117,13 +1148,13 @@ public class ReduceFnRunnerTest {
// Fire another timer with no data; the empty pane should not be output even though the
// trigger is ready to fire
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
tester.fireTimer(firstWindow, new Instant(9), TimeDomain.EVENT_TIME);
assertThat(tester.extractOutput().size(), equalTo(0));
// Finish it off with another datum, which is late
- when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
- triggerShouldFinish(mockTrigger);
+ when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+ triggerShouldFinish(mockTriggerStateMachine);
injectElement(tester, 3);
output = tester.extractOutput();
assertThat(output.size(), equalTo(1));
@@ -1146,19 +1177,25 @@ public class ReduceFnRunnerTest {
*/
@Test
public void testEmptyOnTimeFromOrFinally() throws Exception {
+
+ WindowingStrategy<?, IntervalWindow> strategy =
+ WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTrigger(
+ AfterEach.<IntervalWindow>inOrder(
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(new Duration(5)))
+ .orFinally(AfterWatermark.pastEndOfWindow()),
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(new Duration(25)))))
+ .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+ .withAllowedLateness(Duration.millis(100));
+
ReduceFnTester<Integer, Integer, IntervalWindow> tester =
- ReduceFnTester.combining(FixedWindows.of(Duration.millis(10)),
- AfterEach.<IntervalWindow>inOrder(
- Repeatedly
- .forever(
- AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
- new Duration(5)))
- .orFinally(AfterWatermark.pastEndOfWindow()),
- Repeatedly.forever(
- AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
- new Duration(25)))),
- AccumulationMode.ACCUMULATING_FIRED_PANES, new Sum.SumIntegerFn().<String>asKeyedFn(),
- VarIntCoder.of(), Duration.millis(100));
+ ReduceFnTester
+ .combining(strategy, new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
tester.advanceInputWatermark(new Instant(0));
tester.advanceProcessingTime(new Instant(0));
@@ -1196,9 +1233,11 @@ public class ReduceFnRunnerTest {
*/
@Test
public void testProcessingTime() throws Exception {
- ReduceFnTester<Integer, Integer, IntervalWindow> tester =
- ReduceFnTester.combining(FixedWindows.of(Duration.millis(10)),
- AfterEach.<IntervalWindow>inOrder(
+
+ WindowingStrategy<?, IntervalWindow> strategy =
+ WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+ .withTrigger(AfterEach.<IntervalWindow>inOrder(
Repeatedly
.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
@@ -1206,9 +1245,13 @@ public class ReduceFnRunnerTest {
.orFinally(AfterWatermark.pastEndOfWindow()),
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
- new Duration(25)))),
- AccumulationMode.ACCUMULATING_FIRED_PANES, new Sum.SumIntegerFn().<String>asKeyedFn(),
- VarIntCoder.of(), Duration.millis(100));
+ new Duration(25)))))
+ .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+ .withAllowedLateness(Duration.millis(100));
+
+ ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+ ReduceFnTester
+ .combining(strategy, new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
tester.advanceInputWatermark(new Instant(0));
tester.advanceProcessingTime(new Instant(0));
@@ -1352,11 +1395,13 @@ public class ReduceFnRunnerTest {
public void setGarbageCollectionHoldOnLateElements() throws Exception {
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
ReduceFnTester.nonCombining(
- FixedWindows.of(Duration.millis(10)),
- AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(2)),
- AccumulationMode.DISCARDING_FIRED_PANES,
- Duration.millis(100),
- ClosingBehavior.FIRE_IF_NON_EMPTY);
+ WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
+ .withTrigger(
+ AfterWatermark.pastEndOfWindow()
+ .withLateFirings(AfterPane.elementCountAtLeast(2)))
+ .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
+ .withAllowedLateness(Duration.millis(100))
+ .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY));
tester.advanceInputWatermark(new Instant(0));
tester.advanceOutputWatermark(new Instant(0));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3237440e/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 5752b11..f707349 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -38,6 +38,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner;
+import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.IterableCoder;
@@ -59,7 +63,6 @@ import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.AppliedCombineFn;
-import org.apache.beam.sdk.util.ExecutableTrigger;
import org.apache.beam.sdk.util.NullSideInputReader;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.SideInputReader;
@@ -106,6 +109,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
private final TestWindowingInternals windowingInternals;
private final Coder<OutputT> outputCoder;
private final WindowingStrategy<Object, W> objectStrategy;
+ private final ExecutableTriggerStateMachine executableTriggerStateMachine;
private final ReduceFn<String, InputT, OutputT, W> reduceFn;
private final PipelineOptions options;
@@ -117,38 +121,99 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
*/
private boolean autoAdvanceOutputWatermark = true;
- private ExecutableTrigger executableTrigger;
-
private final InMemoryLongSumAggregator droppedDueToClosedWindow =
new InMemoryLongSumAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
+ /**
+ * Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy}, creating
+ * a {@link TriggerStateMachine} from its {@link Trigger}.
+ */
public static <W extends BoundedWindow> ReduceFnTester<Integer, Iterable<Integer>, W>
nonCombining(WindowingStrategy<?, W> windowingStrategy) throws Exception {
return new ReduceFnTester<Integer, Iterable<Integer>, W>(
windowingStrategy,
+ TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger().getSpec()),
SystemReduceFn.<String, Integer, W>buffering(VarIntCoder.of()),
IterableCoder.of(VarIntCoder.of()),
PipelineOptionsFactory.create(),
NullSideInputReader.empty());
}
- public static <W extends BoundedWindow> ReduceFnTester<Integer, Iterable<Integer>, W>
- nonCombining(WindowFn<?, W> windowFn, Trigger trigger, AccumulationMode mode,
- Duration allowedDataLateness, ClosingBehavior closingBehavior) throws Exception {
+ /**
+ * Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy} and {@link
+ * TriggerStateMachine}, for mocking the interactions between {@link ReduceFnRunner} and the
+ * {@link TriggerStateMachine}.
+ *
+ * <p>Ignores the {@link Trigger} on the {@link WindowingStrategy}.
+ */
+ public static <W extends BoundedWindow>
+ ReduceFnTester<Integer, Iterable<Integer>, W> nonCombining(
+ WindowingStrategy<?, W> windowingStrategy, TriggerStateMachine triggerStateMachine)
+ throws Exception {
+ return new ReduceFnTester<>(
+ windowingStrategy,
+ triggerStateMachine,
+ SystemReduceFn.<String, Integer, W>buffering(VarIntCoder.of()),
+ IterableCoder.of(VarIntCoder.of()),
+ PipelineOptionsFactory.create(),
+ NullSideInputReader.empty());
+ }
+
+ public static <W extends BoundedWindow>
+ ReduceFnTester<Integer, Iterable<Integer>, W> nonCombining(
+ WindowFn<?, W> windowFn,
+ TriggerStateMachine triggerStateMachine,
+ AccumulationMode mode,
+ Duration allowedDataLateness,
+ ClosingBehavior closingBehavior)
+ throws Exception {
WindowingStrategy<?, W> strategy =
WindowingStrategy.of(windowFn)
.withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
- .withTrigger(trigger)
.withMode(mode)
.withAllowedLateness(allowedDataLateness)
.withClosingBehavior(closingBehavior);
- return nonCombining(strategy);
+ return nonCombining(strategy, triggerStateMachine);
}
- public static <W extends BoundedWindow, AccumT, OutputT> ReduceFnTester<Integer, OutputT, W>
- combining(WindowingStrategy<?, W> strategy,
+ /**
+ * Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy} and
+ * {@link KeyedCombineFn}, creating a {@link TriggerStateMachine} from the
+ * {@link Trigger} in the {@link WindowingStrategy}.
+ */
+ public static <W extends BoundedWindow, AccumT, OutputT>
+ ReduceFnTester<Integer, OutputT, W> combining(
+ WindowingStrategy<?, W> strategy,
KeyedCombineFn<String, Integer, AccumT, OutputT> combineFn,
- Coder<OutputT> outputCoder) throws Exception {
+ Coder<OutputT> outputCoder)
+ throws Exception {
+
+ CoderRegistry registry = new CoderRegistry();
+ registry.registerStandardCoders();
+ AppliedCombineFn<String, Integer, AccumT, OutputT> fn =
+ AppliedCombineFn.<String, Integer, AccumT, OutputT>withInputCoder(
+ combineFn, registry, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
+
+ return combining(
+ strategy,
+ TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger().getSpec()),
+ combineFn,
+ outputCoder);
+ }
+
+ /**
+ * Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy},
+ * {@link KeyedCombineFn}, and {@link TriggerStateMachine}, for mocking the interaction
+ * between {@link ReduceFnRunner} and the {@link TriggerStateMachine}.
+ * Ignores the {@link Trigger} in the {@link WindowingStrategy}.
+ */
+ public static <W extends BoundedWindow, AccumT, OutputT>
+ ReduceFnTester<Integer, OutputT, W> combining(
+ WindowingStrategy<?, W> strategy,
+ TriggerStateMachine triggerStateMachine,
+ KeyedCombineFn<String, Integer, AccumT, OutputT> combineFn,
+ Coder<OutputT> outputCoder)
+ throws Exception {
CoderRegistry registry = new CoderRegistry();
registry.registerStandardCoders();
@@ -158,18 +223,45 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
return new ReduceFnTester<Integer, OutputT, W>(
strategy,
+ triggerStateMachine,
SystemReduceFn.<String, Integer, AccumT, OutputT, W>combining(StringUtf8Coder.of(), fn),
outputCoder,
PipelineOptionsFactory.create(),
NullSideInputReader.empty());
}
- public static <W extends BoundedWindow, AccumT, OutputT> ReduceFnTester<Integer, OutputT, W>
- combining(WindowingStrategy<?, W> strategy,
+ public static <W extends BoundedWindow, AccumT, OutputT>
+ ReduceFnTester<Integer, OutputT, W> combining(
+ WindowingStrategy<?, W> strategy,
+ KeyedCombineFnWithContext<String, Integer, AccumT, OutputT> combineFn,
+ Coder<OutputT> outputCoder,
+ PipelineOptions options,
+ SideInputReader sideInputReader)
+ throws Exception {
+ CoderRegistry registry = new CoderRegistry();
+ registry.registerStandardCoders();
+ AppliedCombineFn<String, Integer, AccumT, OutputT> fn =
+ AppliedCombineFn.<String, Integer, AccumT, OutputT>withInputCoder(
+ combineFn, registry, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
+
+ return combining(
+ strategy,
+ TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger().getSpec()),
+ combineFn,
+ outputCoder,
+ options,
+ sideInputReader);
+ }
+
+ public static <W extends BoundedWindow, AccumT, OutputT>
+ ReduceFnTester<Integer, OutputT, W> combining(
+ WindowingStrategy<?, W> strategy,
+ TriggerStateMachine triggerStateMachine,
KeyedCombineFnWithContext<String, Integer, AccumT, OutputT> combineFn,
Coder<OutputT> outputCoder,
PipelineOptions options,
- SideInputReader sideInputReader) throws Exception {
+ SideInputReader sideInputReader)
+ throws Exception {
CoderRegistry registry = new CoderRegistry();
registry.registerStandardCoders();
AppliedCombineFn<String, Integer, AccumT, OutputT> fn =
@@ -178,29 +270,21 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
return new ReduceFnTester<Integer, OutputT, W>(
strategy,
+ triggerStateMachine,
SystemReduceFn.<String, Integer, AccumT, OutputT, W>combining(StringUtf8Coder.of(), fn),
outputCoder,
options,
sideInputReader);
}
- public static <W extends BoundedWindow, AccumT, OutputT> ReduceFnTester<Integer, OutputT, W>
- combining(WindowFn<?, W> windowFn, Trigger trigger, AccumulationMode mode,
- KeyedCombineFn<String, Integer, AccumT, OutputT> combineFn, Coder<OutputT> outputCoder,
- Duration allowedDataLateness) throws Exception {
-
- WindowingStrategy<?, W> strategy =
- WindowingStrategy.of(windowFn)
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
- .withTrigger(trigger)
- .withMode(mode)
- .withAllowedLateness(allowedDataLateness);
-
- return combining(strategy, combineFn, outputCoder);
- }
- private ReduceFnTester(WindowingStrategy<?, W> wildcardStrategy,
- ReduceFn<String, InputT, OutputT, W> reduceFn, Coder<OutputT> outputCoder,
- PipelineOptions options, SideInputReader sideInputReader) throws Exception {
+ private ReduceFnTester(
+ WindowingStrategy<?, W> wildcardStrategy,
+ TriggerStateMachine triggerStateMachine,
+ ReduceFn<String, InputT, OutputT, W> reduceFn,
+ Coder<OutputT> outputCoder,
+ PipelineOptions options,
+ SideInputReader sideInputReader)
+ throws Exception {
@SuppressWarnings("unchecked")
WindowingStrategy<Object, W> objectStrategy = (WindowingStrategy<Object, W>) wildcardStrategy;
@@ -208,8 +292,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
this.reduceFn = reduceFn;
this.windowFn = objectStrategy.getWindowFn();
this.windowingInternals = new TestWindowingInternals(sideInputReader);
+ this.executableTriggerStateMachine = ExecutableTriggerStateMachine.create(triggerStateMachine);
this.outputCoder = outputCoder;
- this.executableTrigger = wildcardStrategy.getTrigger();
this.options = options;
}
@@ -226,6 +310,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
return new ReduceFnRunner<>(
KEY,
objectStrategy,
+ executableTriggerStateMachine,
stateInternals,
timerInternals,
windowingInternals,
@@ -234,10 +319,6 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
options);
}
- public ExecutableTrigger getTrigger() {
- return executableTrigger;
- }
-
public boolean isMarkedFinished(W window) {
return createRunner().isFinished(window);
}
@@ -250,7 +331,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
public final void assertHasOnlyGlobalAndFinishedSetsFor(W... expectedWindows) {
assertHasOnlyGlobalAndAllowedTags(
ImmutableSet.copyOf(expectedWindows),
- ImmutableSet.<StateTag<? super String, ?>>of(TriggerRunner.FINISHED_BITS_TAG));
+ ImmutableSet.<StateTag<? super String, ?>>of(TriggerStateMachineRunner.FINISHED_BITS_TAG));
}
@SafeVarargs
@@ -258,7 +339,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
assertHasOnlyGlobalAndAllowedTags(
ImmutableSet.copyOf(expectedWindows),
ImmutableSet.<StateTag<? super String, ?>>of(
- TriggerRunner.FINISHED_BITS_TAG, PaneInfoTracker.PANE_INFO_TAG,
+ TriggerStateMachineRunner.FINISHED_BITS_TAG, PaneInfoTracker.PANE_INFO_TAG,
WatermarkHold.watermarkHoldTagForOutputTimeFn(objectStrategy.getOutputTimeFn()),
WatermarkHold.EXTRA_HOLD_TAG));
}