You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/20 19:51:56 UTC

[1/4] incubator-beam git commit: Translate ReshuffleTrigger to ReshuffleTriggerStateMachine

Repository: incubator-beam
Updated Branches:
  refs/heads/master 472cf0ec0 -> a69f888e8


Translate ReshuffleTrigger to ReshuffleTriggerStateMachine


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/be07065f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/be07065f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/be07065f

Branch: refs/heads/master
Commit: be07065fe6ae404cc36d21c95ff5025e0ed4233c
Parents: 33d9baa
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 20 10:14:08 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 20 12:41:04 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/core/triggers/TriggerStateMachines.java | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/be07065f/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
index 317e3b9..f19f3cf 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
 import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.ReshuffleTrigger;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.joda.time.Instant;
 
@@ -100,6 +101,10 @@ public class TriggerStateMachines {
       return DefaultTriggerStateMachine.of();
     }
 
+    private TriggerStateMachine evaluateSpecific(ReshuffleTrigger v) {
+      return new ReshuffleTriggerStateMachine();
+    }
+
     private OnceTriggerStateMachine evaluateSpecific(AfterWatermark.FromEndOfWindow v) {
       return AfterWatermarkStateMachine.pastEndOfWindow();
     }


[2/4] incubator-beam git commit: Delete TriggerRunner

Posted by ke...@apache.org.
Delete TriggerRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/33d9baaf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/33d9baaf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/33d9baaf

Branch: refs/heads/master
Commit: 33d9baaf5778c565632f6fe98344b8f1bd8a1d75
Parents: 3237440
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Oct 19 17:41:39 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 20 12:41:04 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/core/TriggerRunner.java | 247 -------------------
 1 file changed, 247 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33d9baaf/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java
deleted file mode 100644
index 8d0f322..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Map;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.util.BitSetCoder;
-import org.apache.beam.sdk.util.ExecutableTrigger;
-import org.apache.beam.sdk.util.FinishedTriggers;
-import org.apache.beam.sdk.util.FinishedTriggersBitSet;
-import org.apache.beam.sdk.util.Timers;
-import org.apache.beam.sdk.util.TriggerContextFactory;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.joda.time.Instant;
-
-/**
- * Executes a trigger while managing persistence of information about which subtriggers are
- * finished. Subtriggers include all recursive trigger expressions as well as the entire trigger.
- *
- * <p>Specifically, the responsibilities are:
- *
- * <ul>
- *   <li>Invoking the trigger's methods via its {@link ExecutableTrigger} wrapper by
- *       constructing the appropriate trigger contexts.</li>
- *   <li>Committing a record of which subtriggers are finished to persistent state.</li>
- *   <li>Restoring the record of which subtriggers are finished from persistent state.</li>
- *   <li>Clearing out the persisted finished set when a caller indicates
- *       (via {#link #clearFinished}) that it is no longer needed.</li>
- * </ul>
- *
- * <p>These responsibilities are intertwined: trigger contexts include mutable information about
- * which subtriggers are finished. This class provides the information when building the contexts
- * and commits the information when the method of the {@link ExecutableTrigger} returns.
- *
- * @param <W> The kind of windows being processed.
- */
-public class TriggerRunner<W extends BoundedWindow> {
-  @VisibleForTesting
-  static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG =
-      StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of()));
-
-  private final ExecutableTrigger rootTrigger;
-  private final TriggerContextFactory<W> contextFactory;
-
-  public TriggerRunner(ExecutableTrigger rootTrigger, TriggerContextFactory<W> contextFactory) {
-    checkState(rootTrigger.getTriggerIndex() == 0);
-    this.rootTrigger = rootTrigger;
-    this.contextFactory = contextFactory;
-  }
-
-  private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) {
-    if (!isFinishedSetNeeded()) {
-      // If no trigger in the tree will ever have finished bits, then we don't need to read them.
-      // So that the code can be agnostic to that fact, we create a BitSet that is all 0 (not
-      // finished) for each trigger in the tree.
-      return FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree());
-    }
-
-    BitSet bitSet = state.read();
-    return bitSet == null
-        ? FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree())
-            : FinishedTriggersBitSet.fromBitSet(bitSet);
-  }
-
-
-  private void clearFinishedBits(ValueState<BitSet> state) {
-    if (!isFinishedSetNeeded()) {
-      // Nothing to clear.
-      return;
-    }
-    state.clear();
-  }
-
-  /** Return true if the trigger is closed in the window corresponding to the specified state. */
-  public boolean isClosed(StateAccessor<?> state) {
-    return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
-  }
-
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
-      justification = "prefetch side effect")
-  public void prefetchForValue(W window, StateAccessor<?> state) {
-    if (isFinishedSetNeeded()) {
-      state.access(FINISHED_BITS_TAG).readLater();
-    }
-    rootTrigger.getSpec().prefetchOnElement(
-        contextFactory.createStateAccessor(window, rootTrigger));
-  }
-
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
-      justification = "prefetch side effect")
-  public void prefetchOnFire(W window, StateAccessor<?> state) {
-    if (isFinishedSetNeeded()) {
-      state.access(FINISHED_BITS_TAG).readLater();
-    }
-    rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger));
-  }
-
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
-      justification = "prefetch side effect")
-  public void prefetchShouldFire(W window, StateAccessor<?> state) {
-    if (isFinishedSetNeeded()) {
-      state.access(FINISHED_BITS_TAG).readLater();
-    }
-    rootTrigger.getSpec().prefetchShouldFire(
-        contextFactory.createStateAccessor(window, rootTrigger));
-  }
-
-  /**
-   * Run the trigger logic to deal with a new value.
-   */
-  public void processValue(W window, Instant timestamp, Timers timers, StateAccessor<?> state)
-      throws Exception {
-    // Clone so that we can detect changes and so that changes here don't pollute merging.
-    FinishedTriggersBitSet finishedSet =
-        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-    Trigger.OnElementContext triggerContext = contextFactory.createOnElementContext(
-        window, timers, timestamp, rootTrigger, finishedSet);
-    rootTrigger.invokeOnElement(triggerContext);
-    persistFinishedSet(state, finishedSet);
-  }
-
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
-      justification = "prefetch side effect")
-  public void prefetchForMerge(
-      W window, Collection<W> mergingWindows, MergingStateAccessor<?, W> state) {
-    if (isFinishedSetNeeded()) {
-      for (ValueState<?> value : state.accessInEachMergingWindow(FINISHED_BITS_TAG).values()) {
-        value.readLater();
-      }
-    }
-    rootTrigger.getSpec().prefetchOnMerge(contextFactory.createMergingStateAccessor(
-        window, mergingWindows, rootTrigger));
-  }
-
-  /**
-   * Run the trigger merging logic as part of executing the specified merge.
-   */
-  public void onMerge(W window, Timers timers, MergingStateAccessor<?, W> state) throws Exception {
-    // Clone so that we can detect changes and so that changes here don't pollute merging.
-    FinishedTriggersBitSet finishedSet =
-        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-
-    // And read the finished bits in each merging window.
-    ImmutableMap.Builder<W, FinishedTriggers> builder = ImmutableMap.builder();
-    for (Map.Entry<W, ValueState<BitSet>> entry :
-        state.accessInEachMergingWindow(FINISHED_BITS_TAG).entrySet()) {
-      // Don't need to clone these, since the trigger context doesn't allow modification
-      builder.put(entry.getKey(), readFinishedBits(entry.getValue()));
-      // Clear the underlying finished bits.
-      clearFinishedBits(entry.getValue());
-    }
-    ImmutableMap<W, FinishedTriggers> mergingFinishedSets = builder.build();
-
-    Trigger.OnMergeContext mergeContext = contextFactory.createOnMergeContext(
-        window, timers, rootTrigger, finishedSet, mergingFinishedSets);
-
-    // Run the merge from the trigger
-    rootTrigger.invokeOnMerge(mergeContext);
-
-    persistFinishedSet(state, finishedSet);
-  }
-
-  public boolean shouldFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
-    FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-    Trigger.TriggerContext context = contextFactory.base(window, timers,
-        rootTrigger, finishedSet);
-    return rootTrigger.invokeShouldFire(context);
-  }
-
-  public void onFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
-    // shouldFire should be false.
-    // However it is too expensive to assert.
-    FinishedTriggersBitSet finishedSet =
-        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-    Trigger.TriggerContext context = contextFactory.base(window, timers,
-        rootTrigger, finishedSet);
-    rootTrigger.invokeOnFire(context);
-    persistFinishedSet(state, finishedSet);
-  }
-
-  private void persistFinishedSet(
-      StateAccessor<?> state, FinishedTriggersBitSet modifiedFinishedSet) {
-    if (!isFinishedSetNeeded()) {
-      return;
-    }
-
-    ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG);
-    if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
-      if (modifiedFinishedSet.getBitSet().isEmpty()) {
-        finishedSetState.clear();
-      } else {
-        finishedSetState.write(modifiedFinishedSet.getBitSet());
-      }
-    }
-  }
-
-  /**
-   * Clear the finished bits.
-   */
-  public void clearFinished(StateAccessor<?> state) {
-    clearFinishedBits(state.access(FINISHED_BITS_TAG));
-  }
-
-  /**
-   * Clear the state used for executing triggers, but leave the finished set to indicate
-   * the window is closed.
-   */
-  public void clearState(W window, Timers timers, StateAccessor<?> state) throws Exception {
-    // Don't need to clone, because we'll be clearing the finished bits anyways.
-    FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG));
-    rootTrigger.invokeClear(contextFactory.base(window, timers, rootTrigger, finishedSet));
-  }
-
-  private boolean isFinishedSetNeeded() {
-    // TODO: If we know that no trigger in the tree will ever finish, we don't need to do the
-    // lookup. Right now, we special case this for the DefaultTrigger.
-    return !(rootTrigger.getSpec() instanceof DefaultTrigger);
-  }
-}


[4/4] incubator-beam git commit: This closes #1138

Posted by ke...@apache.org.
This closes #1138


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a69f888e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a69f888e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a69f888e

Branch: refs/heads/master
Commit: a69f888e830c8998ca8e29eada054f7b938918d6
Parents: 472cf0e be07065
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 20 12:43:11 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 20 12:43:11 2016 -0700

----------------------------------------------------------------------
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   5 +
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |   5 +
 .../beam/runners/core/ReduceFnRunner.java       |  37 +--
 .../apache/beam/runners/core/TriggerRunner.java | 247 ----------------
 .../triggers/TriggerStateMachineRunner.java     |   2 +-
 .../core/triggers/TriggerStateMachines.java     |   5 +
 .../beam/runners/core/ReduceFnRunnerTest.java   | 281 +++++++++++--------
 .../beam/runners/core/ReduceFnTester.java       | 157 ++++++++---
 8 files changed, 318 insertions(+), 421 deletions(-)
----------------------------------------------------------------------



[3/4] incubator-beam git commit: Port ReduceFnRunner to TriggerStateMachine

Posted by ke...@apache.org.
Port ReduceFnRunner to TriggerStateMachine


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3237440e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3237440e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3237440e

Branch: refs/heads/master
Commit: 3237440e2120555a90d74dfaae1d7b44b2d17203
Parents: 472cf0e
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Oct 19 16:48:58 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 20 12:41:04 2016 -0700

----------------------------------------------------------------------
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   5 +
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |   5 +
 .../beam/runners/core/ReduceFnRunner.java       |  37 +--
 .../triggers/TriggerStateMachineRunner.java     |   2 +-
 .../beam/runners/core/ReduceFnRunnerTest.java   | 281 +++++++++++--------
 .../beam/runners/core/ReduceFnTester.java       | 157 ++++++++---
 6 files changed, 313 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3237440e/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index b427037..75a5aa7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -18,6 +18,8 @@
 package org.apache.beam.runners.core;
 
 import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.Sum;
@@ -81,6 +83,9 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
         new ReduceFnRunner<>(
             key,
             windowingStrategy,
+            ExecutableTriggerStateMachine.create(
+                TriggerStateMachines.stateMachineForTrigger(
+                    windowingStrategy.getTrigger().getSpec())),
             stateInternals,
             timerInternals,
             c.windowingInternals(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3237440e/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
index 23986df..4dea775 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -19,6 +19,8 @@ package org.apache.beam.runners.core;
 
 import com.google.common.collect.Iterables;
 import java.util.List;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
@@ -70,6 +72,9 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends
         new ReduceFnRunner<K, InputT, OutputT, W>(
             key,
             strategy,
+            ExecutableTriggerStateMachine.create(
+                TriggerStateMachines.stateMachineForTrigger(
+                    strategy.getTrigger().getSpec())),
             stateInternals,
             timerInternals,
             c.windowingInternals(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3237440e/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 24d472b..78c4e0b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -34,6 +34,9 @@ import javax.annotation.Nullable;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
 import org.apache.beam.runners.core.ReduceFnContextFactory.OnTriggerCallbacks;
 import org.apache.beam.runners.core.ReduceFnContextFactory.StateStyle;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory;
+import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
@@ -50,7 +53,6 @@ import org.apache.beam.sdk.util.NonMergingActiveWindowSet;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.TriggerContextFactory;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingInternals;
@@ -69,25 +71,25 @@ import org.joda.time.Instant;
  * Manages the execution of a {@link ReduceFn} after a {@link GroupByKeyOnly} has partitioned the
  * {@link PCollection} by key.
  *
- * <p>The {@link #onTrigger} relies on a {@link TriggerRunner} to manage the execution of
- * the triggering logic. The {@code ReduceFnRunner}s responsibilities are:
+ * <p>The {@link #onTrigger} relies on a {@link TriggerStateMachineRunner} to manage the execution
+ * of the triggering logic. The {@code ReduceFnRunner}s responsibilities are:
  *
  * <ul>
- * <li>Tracking the windows that are active (have buffered data) as elements arrive and
- * triggers are fired.
- * <li>Holding the watermark based on the timestamps of elements in a pane and releasing it
- * when the trigger fires.
+ * <li>Tracking the windows that are active (have buffered data) as elements arrive and triggers are
+ *     fired.
+ * <li>Holding the watermark based on the timestamps of elements in a pane and releasing it when the
+ *     trigger fires.
  * <li>Calling the appropriate callbacks on {@link ReduceFn} based on trigger execution, timer
- * firings, etc, and providing appropriate contexts to the {@link ReduceFn} for actions
- * such as output.
+ *     firings, etc, and providing appropriate contexts to the {@link ReduceFn} for actions such as
+ *     output.
  * <li>Scheduling garbage collection of state associated with a specific window, and making that
- * happen when the appropriate timer fires.
+ *     happen when the appropriate timer fires.
  * </ul>
  *
- * @param <K>       The type of key being processed.
- * @param <InputT>  The type of values associated with the key.
+ * @param <K> The type of key being processed.
+ * @param <InputT> The type of values associated with the key.
  * @param <OutputT> The output type that will be produced for each key.
- * @param <W>       The type of windows this operates on.
+ * @param <W> The type of windows this operates on.
  */
 public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> implements TimerCallback {
 
@@ -165,7 +167,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
    * garbage collected.
    * </ul>
    */
-  private final TriggerRunner<W> triggerRunner;
+  private final TriggerStateMachineRunner<W> triggerRunner;
 
   /**
    * Store the output watermark holds for each window.
@@ -212,6 +214,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
   public ReduceFnRunner(
       K key,
       WindowingStrategy<?, W> windowingStrategy,
+      ExecutableTriggerStateMachine triggerStateMachine,
       StateInternals<K> stateInternals,
       TimerInternals timerInternals,
       WindowingInternals<?, KV<K, OutputT>> windowingInternals,
@@ -242,9 +245,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
 
     this.watermarkHold = new WatermarkHold<>(timerInternals, windowingStrategy);
     this.triggerRunner =
-        new TriggerRunner<>(
-            windowingStrategy.getTrigger(),
-            new TriggerContextFactory<>(
+        new TriggerStateMachineRunner<>(
+            triggerStateMachine,
+            new TriggerStateMachineContextFactory<>(
                 windowingStrategy.getWindowFn(), stateInternals, activeWindows));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3237440e/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
index 0ffbbca..9f03216 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
@@ -57,7 +57,7 @@ import org.joda.time.Instant;
  */
 public class TriggerStateMachineRunner<W extends BoundedWindow> {
   @VisibleForTesting
-  static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG =
+  public static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG =
       StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of()));
 
   private final ExecutableTriggerStateMachine rootTrigger;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3237440e/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 4d5680c..20eb08b 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -38,6 +38,7 @@ import static org.mockito.Mockito.withSettings;
 import com.google.common.collect.Iterables;
 import java.util.Iterator;
 import java.util.List;
+import org.apache.beam.runners.core.triggers.TriggerStateMachine;
 import org.apache.beam.sdk.WindowMatchers;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -94,23 +95,23 @@ import org.mockito.stubbing.Answer;
 @RunWith(JUnit4.class)
 public class ReduceFnRunnerTest {
   @Mock private SideInputReader mockSideInputReader;
-  private Trigger mockTrigger;
+  private TriggerStateMachine mockTriggerStateMachine;
   private PCollectionView<Integer> mockView;
 
   private IntervalWindow firstWindow;
 
-  private static Trigger.TriggerContext anyTriggerContext() {
-    return Mockito.<Trigger.TriggerContext>any();
+  private static TriggerStateMachine.TriggerContext anyTriggerContext() {
+    return Mockito.<TriggerStateMachine.TriggerContext>any();
   }
-  private static Trigger.OnElementContext anyElementContext() {
-    return Mockito.<Trigger.OnElementContext>any();
+  private static TriggerStateMachine.OnElementContext anyElementContext() {
+    return Mockito.<TriggerStateMachine.OnElementContext>any();
   }
 
   @Before
   public void setUp() {
     MockitoAnnotations.initMocks(this);
 
-    mockTrigger = mock(Trigger.class, withSettings().serializable());
+    mockTriggerStateMachine = mock(TriggerStateMachine.class, withSettings().serializable());
 
     @SuppressWarnings("unchecked")
     PCollectionView<Integer> mockViewUnchecked =
@@ -121,17 +122,17 @@ public class ReduceFnRunnerTest {
 
   private void injectElement(ReduceFnTester<Integer, ?, IntervalWindow> tester, int element)
       throws Exception {
-    doNothing().when(mockTrigger).onElement(anyElementContext());
+    doNothing().when(mockTriggerStateMachine).onElement(anyElementContext());
     tester.injectElements(TimestampedValue.of(element, new Instant(element)));
   }
 
-  private void triggerShouldFinish(Trigger mockTrigger) throws Exception {
+  private void triggerShouldFinish(TriggerStateMachine mockTrigger) throws Exception {
     doAnswer(new Answer<Void>() {
       @Override
       public Void answer(InvocationOnMock invocation) throws Exception {
         @SuppressWarnings("unchecked")
-        Trigger.TriggerContext context =
-            (Trigger.TriggerContext) invocation.getArguments()[0];
+        TriggerStateMachine.TriggerContext context =
+            (TriggerStateMachine.TriggerContext) invocation.getArguments()[0];
         context.trigger().setFinished(true);
         return null;
       }
@@ -143,20 +144,20 @@ public class ReduceFnRunnerTest {
   public void testOnElementBufferingDiscarding() throws Exception {
     // Test basic execution of a trigger using a non-combining window set and discarding mode.
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
-        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine,
             AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
             ClosingBehavior.FIRE_IF_NON_EMPTY);
 
     // Pane of {1, 2}
     injectElement(tester, 1);
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     injectElement(tester, 2);
     assertThat(tester.extractOutput(),
         contains(isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10)));
 
     // Pane of just 3, and finish
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
-    triggerShouldFinish(mockTrigger);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTriggerStateMachine);
     injectElement(tester, 3);
     assertThat(tester.extractOutput(),
             contains(isSingleWindowedValue(containsInAnyOrder(3), 3, 0, 10)));
@@ -173,19 +174,22 @@ public class ReduceFnRunnerTest {
   public void testOnElementBufferingAccumulating() throws Exception {
     // Test basic execution of a trigger using a non-combining window set and accumulating mode.
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
-        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
-            AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(100),
+        ReduceFnTester.nonCombining(
+            FixedWindows.of(Duration.millis(10)),
+            mockTriggerStateMachine,
+            AccumulationMode.ACCUMULATING_FIRED_PANES,
+            Duration.millis(100),
             ClosingBehavior.FIRE_IF_NON_EMPTY);
 
     injectElement(tester, 1);
 
     // Fires {1, 2}
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     injectElement(tester, 2);
 
     // Fires {1, 2, 3} because we are in accumulating mode
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
-    triggerShouldFinish(mockTrigger);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTriggerStateMachine);
     injectElement(tester, 3);
 
     // This element shouldn't be seen, because the trigger has finished
@@ -203,17 +207,27 @@ public class ReduceFnRunnerTest {
   @Test
   public void testOnElementCombiningDiscarding() throws Exception {
     // Test basic execution of a trigger using a non-combining window set and discarding mode.
-    ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining(
-        FixedWindows.of(Duration.millis(10)), mockTrigger, AccumulationMode.DISCARDING_FIRED_PANES,
-        new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of(), Duration.millis(100));
+
+    WindowingStrategy<?, IntervalWindow> strategy =
+        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+            .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
+            .withAllowedLateness(Duration.millis(100));
+
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(
+            strategy,
+            mockTriggerStateMachine,
+            new Sum.SumIntegerFn().<String>asKeyedFn(),
+            VarIntCoder.of());
 
     injectElement(tester, 2);
 
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     injectElement(tester, 3);
 
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
-    triggerShouldFinish(mockTrigger);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTriggerStateMachine);
     injectElement(tester, 4);
 
     // This element shouldn't be seen, because the trigger has finished
@@ -267,14 +281,17 @@ public class ReduceFnRunnerTest {
         window.maxTimestamp().plus(allowedLateness).isAfter(GlobalWindow.INSTANCE.maxTimestamp()));
 
     // Test basic execution of a trigger using a non-combining window set and accumulating mode.
+
+    WindowingStrategy<?, IntervalWindow> strategy =
+        WindowingStrategy.of((WindowFn<?, IntervalWindow>) windowFn)
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+            .withTrigger(AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever()))
+            .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
+            .withAllowedLateness(allowedLateness);
+
     ReduceFnTester<Integer, Integer, IntervalWindow> tester =
-        ReduceFnTester.combining(
-            windowFn,
-            AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever()),
-            AccumulationMode.DISCARDING_FIRED_PANES,
-            new Sum.SumIntegerFn().<String>asKeyedFn(),
-            VarIntCoder.of(),
-            allowedLateness);
+        ReduceFnTester
+            .combining(strategy, new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
 
     tester.injectElements(TimestampedValue.of(13, elementTimestamp));
 
@@ -295,18 +312,27 @@ public class ReduceFnRunnerTest {
   @Test
   public void testOnElementCombiningAccumulating() throws Exception {
     // Test basic execution of a trigger using a non-combining window set and accumulating mode.
+
+    WindowingStrategy<?, IntervalWindow> strategy =
+        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.millis(100));
+
     ReduceFnTester<Integer, Integer, IntervalWindow> tester =
-        ReduceFnTester.combining(FixedWindows.of(Duration.millis(10)), mockTrigger,
-            AccumulationMode.ACCUMULATING_FIRED_PANES, new Sum.SumIntegerFn().<String>asKeyedFn(),
-            VarIntCoder.of(), Duration.millis(100));
+        ReduceFnTester.combining(
+            strategy,
+            mockTriggerStateMachine,
+            new Sum.SumIntegerFn().<String>asKeyedFn(),
+            VarIntCoder.of());
 
     injectElement(tester, 1);
 
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     injectElement(tester, 2);
 
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
-    triggerShouldFinish(mockTrigger);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTriggerStateMachine);
     injectElement(tester, 3);
 
     // This element shouldn't be seen, because the trigger has finished
@@ -326,7 +352,6 @@ public class ReduceFnRunnerTest {
     Integer expectedValue = 5;
     WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy
         .of(FixedWindows.of(Duration.millis(10)))
-        .withTrigger(mockTrigger)
         .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
         .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
         .withAllowedLateness(Duration.millis(100));
@@ -345,16 +370,16 @@ public class ReduceFnRunnerTest {
     SumAndVerifyContextFn combineFn = new SumAndVerifyContextFn(mockView, expectedValue);
     // Test basic execution of a trigger using a non-combining window set and discarding mode.
     ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining(
-        windowingStrategy, combineFn.<String>asKeyedFn(),
+        windowingStrategy, mockTriggerStateMachine, combineFn.<String>asKeyedFn(),
         VarIntCoder.of(), options, mockSideInputReader);
 
     injectElement(tester, 2);
 
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     injectElement(tester, 3);
 
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
-    triggerShouldFinish(mockTrigger);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTriggerStateMachine);
     injectElement(tester, 4);
 
     // This element shouldn't be seen, because the trigger has finished
@@ -373,7 +398,7 @@ public class ReduceFnRunnerTest {
   public void testWatermarkHoldAndLateData() throws Exception {
     // Test handling of late data. Specifically, ensure the watermark hold is correct.
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
-        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine,
             AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(10),
             ClosingBehavior.FIRE_IF_NON_EMPTY);
 
@@ -385,7 +410,7 @@ public class ReduceFnRunnerTest {
     injectElement(tester, 1);
     injectElement(tester, 3);
     assertEquals(new Instant(1), tester.getWatermarkHold());
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     injectElement(tester, 2);
     List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
     assertThat(output, contains(
@@ -406,14 +431,14 @@ public class ReduceFnRunnerTest {
     assertEquals(new Instant(4), tester.getOutputWatermark());
 
     // Some late, some on time. Verify that we only hold to the minimum of on-time.
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
     tester.advanceInputWatermark(new Instant(4));
     injectElement(tester, 2);
     injectElement(tester, 3);
     assertEquals(new Instant(9), tester.getWatermarkHold());
     injectElement(tester, 5);
     assertEquals(new Instant(5), tester.getWatermarkHold());
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     injectElement(tester, 4);
     output = tester.extractOutput();
     assertThat(output,
@@ -428,7 +453,7 @@ public class ReduceFnRunnerTest {
         equalTo(PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)));
 
     // All late -- output at end of window timestamp.
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
     tester.advanceInputWatermark(new Instant(8));
     injectElement(tester, 6);
     injectElement(tester, 5);
@@ -436,7 +461,7 @@ public class ReduceFnRunnerTest {
     injectElement(tester, 4);
 
     // Fire the ON_TIME pane
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     tester.advanceInputWatermark(new Instant(10));
 
     // Output time is end of the window, because all the new data was late, but the pane
@@ -455,7 +480,7 @@ public class ReduceFnRunnerTest {
 
     // This is "pending" at the time the watermark makes it way-late.
     // Because we're about to expire the window, we output it.
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
     injectElement(tester, 8);
     assertEquals(0, tester.getElementsDroppedDueToClosedWindow());
 
@@ -492,7 +517,7 @@ public class ReduceFnRunnerTest {
   public void dontSetHoldIfTooLateForEndOfWindowTimer() throws Exception {
     // Make sure holds are only set if they are accompanied by an end-of-window timer.
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
-        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine,
             AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(10),
             ClosingBehavior.FIRE_ALWAYS);
     tester.setAutoAdvanceOutputWatermark(false);
@@ -506,9 +531,9 @@ public class ReduceFnRunnerTest {
     assertEquals(new Instant(19), tester.getNextTimer(TimeDomain.EVENT_TIME));
 
     // Trigger the end-of-window timer.
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     tester.advanceInputWatermark(new Instant(20));
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
     // Hold has been replaced with garbage collection hold. Waiting for garbage collection.
     assertEquals(new Instant(29), tester.getWatermarkHold());
     assertEquals(new Instant(29), tester.getNextTimer(TimeDomain.EVENT_TIME));
@@ -530,37 +555,37 @@ public class ReduceFnRunnerTest {
   @Test
   public void testPaneInfoAllStates() throws Exception {
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
-        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine,
             AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
             ClosingBehavior.FIRE_IF_NON_EMPTY);
 
     tester.advanceInputWatermark(new Instant(0));
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     injectElement(tester, 1);
     assertThat(tester.extractOutput(), contains(
         WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY))));
 
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     injectElement(tester, 2);
     assertThat(tester.extractOutput(), contains(
         WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.EARLY, 1, -1))));
 
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
     tester.advanceInputWatermark(new Instant(15));
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     injectElement(tester, 3);
     assertThat(tester.extractOutput(), contains(
         WindowMatchers.valueWithPaneInfo(
             PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0))));
 
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     injectElement(tester, 4);
     assertThat(tester.extractOutput(), contains(
         WindowMatchers.valueWithPaneInfo(
             PaneInfo.createPane(false, false, Timing.LATE, 3, 1))));
 
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
-    triggerShouldFinish(mockTrigger);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTriggerStateMachine);
     injectElement(tester, 5);
     assertThat(tester.extractOutput(), contains(
         WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 4, 2))));
@@ -758,13 +783,13 @@ public class ReduceFnRunnerTest {
   @Test
   public void testPaneInfoSkipToFinish() throws Exception {
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
-        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine,
             AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
             ClosingBehavior.FIRE_IF_NON_EMPTY);
 
     tester.advanceInputWatermark(new Instant(0));
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
-    triggerShouldFinish(mockTrigger);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTriggerStateMachine);
     injectElement(tester, 1);
     assertThat(tester.extractOutput(), contains(
         WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, true, Timing.EARLY))));
@@ -773,13 +798,13 @@ public class ReduceFnRunnerTest {
   @Test
   public void testPaneInfoSkipToNonSpeculativeAndFinish() throws Exception {
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
-        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine,
             AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
             ClosingBehavior.FIRE_IF_NON_EMPTY);
 
     tester.advanceInputWatermark(new Instant(15));
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
-    triggerShouldFinish(mockTrigger);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTriggerStateMachine);
     injectElement(tester, 1);
     assertThat(tester.extractOutput(), contains(
         WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, true, Timing.LATE))));
@@ -790,7 +815,8 @@ public class ReduceFnRunnerTest {
     // Verify that we merge windows before producing output so users don't see undesired
     // unmerged windows.
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
-        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)),
+            mockTriggerStateMachine,
             AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(0),
             ClosingBehavior.FIRE_IF_NON_EMPTY);
 
@@ -823,7 +849,8 @@ public class ReduceFnRunnerTest {
   @Test
   public void testMergingWithCloseBeforeGC() throws Exception {
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
-        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)),
+            mockTriggerStateMachine,
             AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
             ClosingBehavior.FIRE_IF_NON_EMPTY);
 
@@ -833,8 +860,8 @@ public class ReduceFnRunnerTest {
         TimestampedValue.of(10, new Instant(10))); // in [10, 20)
 
     // Close the trigger, but the gargbage collection timer is still pending.
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
-    triggerShouldFinish(mockTrigger);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTriggerStateMachine);
     tester.advanceInputWatermark(new Instant(30));
 
     // Now the garbage collection timer will fire, finding the trigger already closed.
@@ -858,7 +885,8 @@ public class ReduceFnRunnerTest {
   @Test
   public void testMergingWithCloseTrigger() throws Exception {
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
-        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)),
+            mockTriggerStateMachine,
                                     AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
                                     ClosingBehavior.FIRE_IF_NON_EMPTY);
 
@@ -867,14 +895,14 @@ public class ReduceFnRunnerTest {
                           TimestampedValue.of(2, new Instant(2)));
 
     // Force the trigger to be closed for the merged window.
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
-    triggerShouldFinish(mockTrigger);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTriggerStateMachine);
     tester.advanceInputWatermark(new Instant(13));
 
     // Trigger is now closed.
     assertTrue(tester.isMarkedFinished(new IntervalWindow(new Instant(1), new Instant(12))));
 
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
 
     // Revisit the same session window.
     tester.injectElements(TimestampedValue.of(1, new Instant(1)),
@@ -891,7 +919,8 @@ public class ReduceFnRunnerTest {
   @Test
   public void testMergingWithReusedWindow() throws Exception {
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
-        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)),
+            mockTriggerStateMachine,
                                     AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
                                     ClosingBehavior.FIRE_IF_NON_EMPTY);
 
@@ -899,8 +928,8 @@ public class ReduceFnRunnerTest {
     tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 11), gc at 21.
 
     // Close the trigger, but the gargbage collection timer is still pending.
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
-    triggerShouldFinish(mockTrigger);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTriggerStateMachine);
     tester.advanceInputWatermark(new Instant(15));
 
     // Another element in the same session window.
@@ -932,14 +961,15 @@ public class ReduceFnRunnerTest {
   @Test
   public void testMergingWithClosedRepresentative() throws Exception {
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
-        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)),
+            mockTriggerStateMachine,
                                     AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
                                     ClosingBehavior.FIRE_IF_NON_EMPTY);
 
     // 2 elements into merged session window.
     // Close the trigger, but the garbage collection timer is still pending.
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
-    triggerShouldFinish(mockTrigger);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTriggerStateMachine);
     tester.injectElements(TimestampedValue.of(1, new Instant(1)),       // in [1, 11), gc at 21.
                           TimestampedValue.of(8, new Instant(8)));      // in [8, 18), gc at 28.
 
@@ -973,17 +1003,18 @@ public class ReduceFnRunnerTest {
   @Test
   public void testMergingWithClosedDoesNotPoison() throws Exception {
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
-        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)),
+            mockTriggerStateMachine,
             AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
             ClosingBehavior.FIRE_IF_NON_EMPTY);
 
     // 1 element, force its trigger to close.
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
-    triggerShouldFinish(mockTrigger);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTriggerStateMachine);
     tester.injectElements(TimestampedValue.of(2, new Instant(2)));
 
     // 3 elements, one already closed.
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
     tester.injectElements(TimestampedValue.of(1, new Instant(1)),
         TimestampedValue.of(2, new Instant(2)),
         TimestampedValue.of(3, new Instant(3)));
@@ -1052,7 +1083,7 @@ public class ReduceFnRunnerTest {
     // Test uninteresting (empty) panes don't increment the index or otherwise
     // modify PaneInfo.
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
-        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine,
             AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
             ClosingBehavior.FIRE_IF_NON_EMPTY);
 
@@ -1062,16 +1093,16 @@ public class ReduceFnRunnerTest {
     tester.advanceInputWatermark(new Instant(12));
 
     // Fire the on-time pane
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     tester.fireTimer(firstWindow, new Instant(9), TimeDomain.EVENT_TIME);
 
     // Fire another timer (with no data, so it's an uninteresting pane that should not be output).
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     tester.fireTimer(firstWindow, new Instant(9), TimeDomain.EVENT_TIME);
 
     // Finish it off with another datum.
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
-    triggerShouldFinish(mockTrigger);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTriggerStateMachine);
     injectElement(tester, 3);
 
     // The intermediate trigger firing shouldn't result in any output.
@@ -1097,7 +1128,7 @@ public class ReduceFnRunnerTest {
     // Test uninteresting (empty) panes don't increment the index or otherwise
     // modify PaneInfo.
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
-        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine,
             AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(100),
             ClosingBehavior.FIRE_IF_NON_EMPTY);
 
@@ -1107,7 +1138,7 @@ public class ReduceFnRunnerTest {
     tester.advanceInputWatermark(new Instant(12));
 
     // Trigger the on-time pane
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     tester.fireTimer(firstWindow, new Instant(9), TimeDomain.EVENT_TIME);
     List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
     assertThat(output.size(), equalTo(1));
@@ -1117,13 +1148,13 @@ public class ReduceFnRunnerTest {
 
     // Fire another timer with no data; the empty pane should not be output even though the
     // trigger is ready to fire
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
     tester.fireTimer(firstWindow, new Instant(9), TimeDomain.EVENT_TIME);
     assertThat(tester.extractOutput().size(), equalTo(0));
 
     // Finish it off with another datum, which is late
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
-    triggerShouldFinish(mockTrigger);
+    when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTriggerStateMachine);
     injectElement(tester, 3);
     output = tester.extractOutput();
     assertThat(output.size(), equalTo(1));
@@ -1146,19 +1177,25 @@ public class ReduceFnRunnerTest {
    */
   @Test
   public void testEmptyOnTimeFromOrFinally() throws Exception {
+
+    WindowingStrategy<?, IntervalWindow> strategy =
+        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+            .withTrigger(
+                AfterEach.<IntervalWindow>inOrder(
+                    Repeatedly.forever(
+                            AfterProcessingTime.pastFirstElementInPane()
+                                .plusDelayOf(new Duration(5)))
+                        .orFinally(AfterWatermark.pastEndOfWindow()),
+                    Repeatedly.forever(
+                        AfterProcessingTime.pastFirstElementInPane()
+                            .plusDelayOf(new Duration(25)))))
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.millis(100));
+
     ReduceFnTester<Integer, Integer, IntervalWindow> tester =
-        ReduceFnTester.combining(FixedWindows.of(Duration.millis(10)),
-            AfterEach.<IntervalWindow>inOrder(
-                Repeatedly
-                    .forever(
-                        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
-                            new Duration(5)))
-                    .orFinally(AfterWatermark.pastEndOfWindow()),
-                Repeatedly.forever(
-                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
-                        new Duration(25)))),
-            AccumulationMode.ACCUMULATING_FIRED_PANES, new Sum.SumIntegerFn().<String>asKeyedFn(),
-            VarIntCoder.of(), Duration.millis(100));
+        ReduceFnTester
+            .combining(strategy, new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
 
     tester.advanceInputWatermark(new Instant(0));
     tester.advanceProcessingTime(new Instant(0));
@@ -1196,9 +1233,11 @@ public class ReduceFnRunnerTest {
    */
   @Test
   public void testProcessingTime() throws Exception {
-    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
-        ReduceFnTester.combining(FixedWindows.of(Duration.millis(10)),
-            AfterEach.<IntervalWindow>inOrder(
+
+    WindowingStrategy<?, IntervalWindow> strategy =
+        WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+            .withTrigger(AfterEach.<IntervalWindow>inOrder(
                 Repeatedly
                     .forever(
                         AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
@@ -1206,9 +1245,13 @@ public class ReduceFnRunnerTest {
                     .orFinally(AfterWatermark.pastEndOfWindow()),
                 Repeatedly.forever(
                     AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
-                        new Duration(25)))),
-            AccumulationMode.ACCUMULATING_FIRED_PANES, new Sum.SumIntegerFn().<String>asKeyedFn(),
-            VarIntCoder.of(), Duration.millis(100));
+                        new Duration(25)))))
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.millis(100));
+
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester
+            .combining(strategy, new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
 
     tester.advanceInputWatermark(new Instant(0));
     tester.advanceProcessingTime(new Instant(0));
@@ -1352,11 +1395,13 @@ public class ReduceFnRunnerTest {
   public void setGarbageCollectionHoldOnLateElements() throws Exception {
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
         ReduceFnTester.nonCombining(
-            FixedWindows.of(Duration.millis(10)),
-            AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(2)),
-            AccumulationMode.DISCARDING_FIRED_PANES,
-            Duration.millis(100),
-            ClosingBehavior.FIRE_IF_NON_EMPTY);
+            WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
+                .withTrigger(
+                    AfterWatermark.pastEndOfWindow()
+                        .withLateFirings(AfterPane.elementCountAtLeast(2)))
+                .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
+                .withAllowedLateness(Duration.millis(100))
+                .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY));
 
     tester.advanceInputWatermark(new Instant(0));
     tester.advanceOutputWatermark(new Instant(0));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3237440e/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 5752b11..f707349 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -38,6 +38,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner;
+import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.IterableCoder;
@@ -59,7 +63,6 @@ import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.AppliedCombineFn;
-import org.apache.beam.sdk.util.ExecutableTrigger;
 import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.SideInputReader;
@@ -106,6 +109,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
   private final TestWindowingInternals windowingInternals;
   private final Coder<OutputT> outputCoder;
   private final WindowingStrategy<Object, W> objectStrategy;
+  private final ExecutableTriggerStateMachine executableTriggerStateMachine;
   private final ReduceFn<String, InputT, OutputT, W> reduceFn;
   private final PipelineOptions options;
 
@@ -117,38 +121,99 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
    */
   private boolean autoAdvanceOutputWatermark = true;
 
-  private ExecutableTrigger executableTrigger;
-
   private final InMemoryLongSumAggregator droppedDueToClosedWindow =
       new InMemoryLongSumAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
 
+  /**
+   * Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy}, creating
+   * a {@link TriggerStateMachine} from its {@link Trigger}.
+   */
   public static <W extends BoundedWindow> ReduceFnTester<Integer, Iterable<Integer>, W>
       nonCombining(WindowingStrategy<?, W> windowingStrategy) throws Exception {
     return new ReduceFnTester<Integer, Iterable<Integer>, W>(
         windowingStrategy,
+        TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger().getSpec()),
         SystemReduceFn.<String, Integer, W>buffering(VarIntCoder.of()),
         IterableCoder.of(VarIntCoder.of()),
         PipelineOptionsFactory.create(),
         NullSideInputReader.empty());
   }
 
-  public static <W extends BoundedWindow> ReduceFnTester<Integer, Iterable<Integer>, W>
-      nonCombining(WindowFn<?, W> windowFn, Trigger trigger, AccumulationMode mode,
-          Duration allowedDataLateness, ClosingBehavior closingBehavior) throws Exception {
+  /**
+   * Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy} and {@link
+   * TriggerStateMachine}, for mocking the interactions between {@link ReduceFnRunner} and the
+   * {@link TriggerStateMachine}.
+   *
+   * <p>Ignores the {@link Trigger} on the {@link WindowingStrategy}.
+   */
+  public static <W extends BoundedWindow>
+      ReduceFnTester<Integer, Iterable<Integer>, W> nonCombining(
+          WindowingStrategy<?, W> windowingStrategy, TriggerStateMachine triggerStateMachine)
+          throws Exception {
+    return new ReduceFnTester<>(
+        windowingStrategy,
+        triggerStateMachine,
+        SystemReduceFn.<String, Integer, W>buffering(VarIntCoder.of()),
+        IterableCoder.of(VarIntCoder.of()),
+        PipelineOptionsFactory.create(),
+        NullSideInputReader.empty());
+  }
+
+  public static <W extends BoundedWindow>
+      ReduceFnTester<Integer, Iterable<Integer>, W> nonCombining(
+          WindowFn<?, W> windowFn,
+          TriggerStateMachine triggerStateMachine,
+          AccumulationMode mode,
+          Duration allowedDataLateness,
+          ClosingBehavior closingBehavior)
+          throws Exception {
     WindowingStrategy<?, W> strategy =
         WindowingStrategy.of(windowFn)
             .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
-            .withTrigger(trigger)
             .withMode(mode)
             .withAllowedLateness(allowedDataLateness)
             .withClosingBehavior(closingBehavior);
-    return nonCombining(strategy);
+    return nonCombining(strategy, triggerStateMachine);
   }
 
-  public static <W extends BoundedWindow, AccumT, OutputT> ReduceFnTester<Integer, OutputT, W>
-      combining(WindowingStrategy<?, W> strategy,
+  /**
+   * Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy} and
+   * {@link KeyedCombineFn}, creating a {@link TriggerStateMachine} from the
+   * {@link Trigger} in the {@link WindowingStrategy}.
+   */
+  public static <W extends BoundedWindow, AccumT, OutputT>
+      ReduceFnTester<Integer, OutputT, W> combining(
+          WindowingStrategy<?, W> strategy,
           KeyedCombineFn<String, Integer, AccumT, OutputT> combineFn,
-          Coder<OutputT> outputCoder) throws Exception {
+          Coder<OutputT> outputCoder)
+          throws Exception {
+
+    CoderRegistry registry = new CoderRegistry();
+    registry.registerStandardCoders();
+    AppliedCombineFn<String, Integer, AccumT, OutputT> fn =
+        AppliedCombineFn.<String, Integer, AccumT, OutputT>withInputCoder(
+            combineFn, registry, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
+
+    return combining(
+        strategy,
+        TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger().getSpec()),
+        combineFn,
+        outputCoder);
+  }
+
+  /**
+   * Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy},
+   * {@link KeyedCombineFn}, and {@link TriggerStateMachine}, for mocking the interaction
+   * between {@link ReduceFnRunner} and the {@link TriggerStateMachine}.
+   * Ignores the {@link Trigger} in the {@link WindowingStrategy}.
+   */
+  public static <W extends BoundedWindow, AccumT, OutputT>
+  ReduceFnTester<Integer, OutputT, W> combining(
+      WindowingStrategy<?, W> strategy,
+      TriggerStateMachine triggerStateMachine,
+      KeyedCombineFn<String, Integer, AccumT, OutputT> combineFn,
+      Coder<OutputT> outputCoder)
+      throws Exception {
 
     CoderRegistry registry = new CoderRegistry();
     registry.registerStandardCoders();
@@ -158,18 +223,45 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
 
     return new ReduceFnTester<Integer, OutputT, W>(
         strategy,
+        triggerStateMachine,
         SystemReduceFn.<String, Integer, AccumT, OutputT, W>combining(StringUtf8Coder.of(), fn),
         outputCoder,
         PipelineOptionsFactory.create(),
         NullSideInputReader.empty());
   }
 
-  public static <W extends BoundedWindow, AccumT, OutputT> ReduceFnTester<Integer, OutputT, W>
-  combining(WindowingStrategy<?, W> strategy,
+  public static <W extends BoundedWindow, AccumT, OutputT>
+      ReduceFnTester<Integer, OutputT, W> combining(
+          WindowingStrategy<?, W> strategy,
+          KeyedCombineFnWithContext<String, Integer, AccumT, OutputT> combineFn,
+          Coder<OutputT> outputCoder,
+          PipelineOptions options,
+          SideInputReader sideInputReader)
+          throws Exception {
+    CoderRegistry registry = new CoderRegistry();
+    registry.registerStandardCoders();
+    AppliedCombineFn<String, Integer, AccumT, OutputT> fn =
+        AppliedCombineFn.<String, Integer, AccumT, OutputT>withInputCoder(
+            combineFn, registry, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
+
+    return combining(
+        strategy,
+        TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger().getSpec()),
+        combineFn,
+        outputCoder,
+        options,
+        sideInputReader);
+  }
+
+  public static <W extends BoundedWindow, AccumT, OutputT>
+  ReduceFnTester<Integer, OutputT, W> combining(
+      WindowingStrategy<?, W> strategy,
+      TriggerStateMachine triggerStateMachine,
       KeyedCombineFnWithContext<String, Integer, AccumT, OutputT> combineFn,
       Coder<OutputT> outputCoder,
       PipelineOptions options,
-      SideInputReader sideInputReader) throws Exception {
+      SideInputReader sideInputReader)
+      throws Exception {
     CoderRegistry registry = new CoderRegistry();
     registry.registerStandardCoders();
     AppliedCombineFn<String, Integer, AccumT, OutputT> fn =
@@ -178,29 +270,21 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
 
     return new ReduceFnTester<Integer, OutputT, W>(
         strategy,
+        triggerStateMachine,
         SystemReduceFn.<String, Integer, AccumT, OutputT, W>combining(StringUtf8Coder.of(), fn),
         outputCoder,
         options,
         sideInputReader);
   }
-  public static <W extends BoundedWindow, AccumT, OutputT> ReduceFnTester<Integer, OutputT, W>
-      combining(WindowFn<?, W> windowFn, Trigger trigger, AccumulationMode mode,
-          KeyedCombineFn<String, Integer, AccumT, OutputT> combineFn, Coder<OutputT> outputCoder,
-          Duration allowedDataLateness) throws Exception {
-
-    WindowingStrategy<?, W> strategy =
-        WindowingStrategy.of(windowFn)
-            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
-            .withTrigger(trigger)
-            .withMode(mode)
-            .withAllowedLateness(allowedDataLateness);
-
-    return combining(strategy, combineFn, outputCoder);
-  }
 
-  private ReduceFnTester(WindowingStrategy<?, W> wildcardStrategy,
-      ReduceFn<String, InputT, OutputT, W> reduceFn, Coder<OutputT> outputCoder,
-      PipelineOptions options, SideInputReader sideInputReader) throws Exception {
+  private ReduceFnTester(
+      WindowingStrategy<?, W> wildcardStrategy,
+      TriggerStateMachine triggerStateMachine,
+      ReduceFn<String, InputT, OutputT, W> reduceFn,
+      Coder<OutputT> outputCoder,
+      PipelineOptions options,
+      SideInputReader sideInputReader)
+      throws Exception {
     @SuppressWarnings("unchecked")
     WindowingStrategy<Object, W> objectStrategy = (WindowingStrategy<Object, W>) wildcardStrategy;
 
@@ -208,8 +292,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
     this.reduceFn = reduceFn;
     this.windowFn = objectStrategy.getWindowFn();
     this.windowingInternals = new TestWindowingInternals(sideInputReader);
+    this.executableTriggerStateMachine = ExecutableTriggerStateMachine.create(triggerStateMachine);
     this.outputCoder = outputCoder;
-    this.executableTrigger = wildcardStrategy.getTrigger();
     this.options = options;
   }
 
@@ -226,6 +310,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
     return new ReduceFnRunner<>(
         KEY,
         objectStrategy,
+        executableTriggerStateMachine,
         stateInternals,
         timerInternals,
         windowingInternals,
@@ -234,10 +319,6 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
         options);
   }
 
-  public ExecutableTrigger getTrigger() {
-    return executableTrigger;
-  }
-
   public boolean isMarkedFinished(W window) {
     return createRunner().isFinished(window);
   }
@@ -250,7 +331,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
   public final void assertHasOnlyGlobalAndFinishedSetsFor(W... expectedWindows) {
     assertHasOnlyGlobalAndAllowedTags(
         ImmutableSet.copyOf(expectedWindows),
-        ImmutableSet.<StateTag<? super String, ?>>of(TriggerRunner.FINISHED_BITS_TAG));
+        ImmutableSet.<StateTag<? super String, ?>>of(TriggerStateMachineRunner.FINISHED_BITS_TAG));
   }
 
   @SafeVarargs
@@ -258,7 +339,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
     assertHasOnlyGlobalAndAllowedTags(
         ImmutableSet.copyOf(expectedWindows),
         ImmutableSet.<StateTag<? super String, ?>>of(
-            TriggerRunner.FINISHED_BITS_TAG, PaneInfoTracker.PANE_INFO_TAG,
+            TriggerStateMachineRunner.FINISHED_BITS_TAG, PaneInfoTracker.PANE_INFO_TAG,
             WatermarkHold.watermarkHoldTagForOutputTimeFn(objectStrategy.getOutputTimeFn()),
             WatermarkHold.EXTRA_HOLD_TAG));
   }