You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/30 21:31:28 UTC

[26/50] beam git commit: Removed OnceTriggerStateMachine

Removed OnceTriggerStateMachine


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4f9820b1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4f9820b1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4f9820b1

Branch: refs/heads/gearpump-runner
Commit: 4f9820b1f24103831f3b0a4f5783f9ca726f8cd7
Parents: 0d4c85d
Author: = <=>
Authored: Wed Jun 14 20:11:49 2017 -0400
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Jun 28 20:14:24 2017 -0700

----------------------------------------------------------------------
 .../core/triggers/AfterAllStateMachine.java     | 25 ++++++++----------
 .../AfterDelayFromFirstElementStateMachine.java |  6 ++---
 .../core/triggers/AfterFirstStateMachine.java   | 20 +++++++--------
 .../core/triggers/AfterPaneStateMachine.java    |  6 ++---
 .../triggers/AfterWatermarkStateMachine.java    |  7 ++---
 .../triggers/ExecutableTriggerStateMachine.java | 23 +++--------------
 .../core/triggers/NeverStateMachine.java        |  5 ++--
 .../core/triggers/TriggerStateMachine.java      | 27 --------------------
 .../triggers/AfterFirstStateMachineTest.java    |  5 ++--
 .../AfterWatermarkStateMachineTest.java         |  7 +++--
 .../core/triggers/StubTriggerStateMachine.java  |  7 +++--
 11 files changed, 44 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4f9820b1/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
index 0f0c17c..3530ed1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
@@ -23,7 +23,6 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.sdk.annotations.Experimental;
 
 /**
@@ -31,7 +30,7 @@ import org.apache.beam.sdk.annotations.Experimental;
  * have fired.
  */
 @Experimental(Experimental.Kind.TRIGGER)
-public class AfterAllStateMachine extends OnceTriggerStateMachine {
+public class AfterAllStateMachine extends TriggerStateMachine {
 
   private AfterAllStateMachine(List<TriggerStateMachine> subTriggers) {
     super(subTriggers);
@@ -42,11 +41,11 @@ public class AfterAllStateMachine extends OnceTriggerStateMachine {
    * Returns an {@code AfterAll} {@code Trigger} with the given subtriggers.
    */
   @SafeVarargs
-  public static OnceTriggerStateMachine of(TriggerStateMachine... triggers) {
+  public static TriggerStateMachine of(TriggerStateMachine... triggers) {
     return new AfterAllStateMachine(Arrays.<TriggerStateMachine>asList(triggers));
   }
 
-  public static OnceTriggerStateMachine of(Iterable<? extends TriggerStateMachine> triggers) {
+  public static TriggerStateMachine of(Iterable<? extends TriggerStateMachine> triggers) {
     return new AfterAllStateMachine(ImmutableList.copyOf(triggers));
   }
 
@@ -78,24 +77,21 @@ public class AfterAllStateMachine extends OnceTriggerStateMachine {
    */
   @Override
   public boolean shouldFire(TriggerContext context) throws Exception {
-    for (ExecutableTriggerStateMachine subtrigger : context.trigger().subTriggers()) {
-      if (!context.forTrigger(subtrigger).trigger().isFinished()
-          && !subtrigger.invokeShouldFire(context)) {
+    for (ExecutableTriggerStateMachine subTrigger : context.trigger().subTriggers()) {
+      if (!context.forTrigger(subTrigger).trigger().isFinished()
+          && !subTrigger.invokeShouldFire(context)) {
         return false;
       }
     }
     return true;
   }
 
-  /**
-   * Invokes {@link #onFire} for all subtriggers, eliding redundant calls to {@link #shouldFire}
-   * because they all must be ready to fire.
-   */
   @Override
-  public void onOnlyFiring(TriggerContext context) throws Exception {
-    for (ExecutableTriggerStateMachine subtrigger : context.trigger().subTriggers()) {
-      subtrigger.invokeOnFire(context);
+  public void onFire(TriggerContext context) throws Exception {
+    for (ExecutableTriggerStateMachine subTrigger : context.trigger().subTriggers()) {
+      subTrigger.invokeOnFire(context);
     }
+    context.trigger().setFinished(true);
   }
 
   @Override
@@ -103,7 +99,6 @@ public class AfterAllStateMachine extends OnceTriggerStateMachine {
     StringBuilder builder = new StringBuilder("AfterAll.of(");
     Joiner.on(", ").appendTo(builder, subTriggers);
     builder.append(")");
-
     return builder.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f9820b1/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index 8d8d0de..06c2066 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -27,7 +27,6 @@ import org.apache.beam.runners.core.StateAccessor;
 import org.apache.beam.runners.core.StateMerging;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.state.CombiningState;
@@ -50,7 +49,7 @@ import org.joda.time.format.PeriodFormatter;
 // This class should be inlined to subclasses and deleted, simplifying them too
 // https://issues.apache.org/jira/browse/BEAM-1486
 @Experimental(Experimental.Kind.TRIGGER)
-public abstract class AfterDelayFromFirstElementStateMachine extends OnceTriggerStateMachine {
+public abstract class AfterDelayFromFirstElementStateMachine extends TriggerStateMachine {
 
   protected static final List<SerializableFunction<Instant, Instant>> IDENTITY =
       ImmutableList.<SerializableFunction<Instant, Instant>>of();
@@ -237,8 +236,9 @@ public abstract class AfterDelayFromFirstElementStateMachine extends OnceTrigger
   }
 
   @Override
-  protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) throws Exception {
+  public final void onFire(TriggerContext context) throws Exception {
     clear(context);
+    context.trigger().setFinished(true);
   }
 
   protected Instant computeTargetTimestamp(Instant time) {

http://git-wip-us.apache.org/repos/asf/beam/blob/4f9820b1/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
index 840a65c..58c24c5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
@@ -23,7 +23,6 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.sdk.annotations.Experimental;
 
 /**
@@ -31,7 +30,7 @@ import org.apache.beam.sdk.annotations.Experimental;
  * sub-triggers have fired.
  */
 @Experimental(Experimental.Kind.TRIGGER)
-public class AfterFirstStateMachine extends OnceTriggerStateMachine {
+public class AfterFirstStateMachine extends TriggerStateMachine {
 
   AfterFirstStateMachine(List<TriggerStateMachine> subTriggers) {
     super(subTriggers);
@@ -42,12 +41,12 @@ public class AfterFirstStateMachine extends OnceTriggerStateMachine {
    * Returns an {@code AfterFirst} {@code Trigger} with the given subtriggers.
    */
   @SafeVarargs
-  public static OnceTriggerStateMachine of(
+  public static TriggerStateMachine of(
       TriggerStateMachine... triggers) {
     return new AfterFirstStateMachine(Arrays.<TriggerStateMachine>asList(triggers));
   }
 
-  public static OnceTriggerStateMachine of(
+  public static TriggerStateMachine of(
       Iterable<? extends TriggerStateMachine> triggers) {
     return new AfterFirstStateMachine(ImmutableList.copyOf(triggers));
   }
@@ -79,18 +78,19 @@ public class AfterFirstStateMachine extends OnceTriggerStateMachine {
   }
 
   @Override
-  protected void onOnlyFiring(TriggerContext context) throws Exception {
-    for (ExecutableTriggerStateMachine subtrigger : context.trigger().subTriggers()) {
-      TriggerContext subContext = context.forTrigger(subtrigger);
-      if (subtrigger.invokeShouldFire(subContext)) {
+  public void onFire(TriggerContext context) throws Exception {
+    for (ExecutableTriggerStateMachine subTrigger : context.trigger().subTriggers()) {
+      TriggerContext subContext = context.forTrigger(subTrigger);
+      if (subTrigger.invokeShouldFire(subContext)) {
         // If the trigger is ready to fire, then do whatever it needs to do.
-        subtrigger.invokeOnFire(subContext);
+        subTrigger.invokeOnFire(subContext);
       } else {
         // If the trigger is not ready to fire, it is nonetheless true that whatever
         // pending pane it was tracking is now gone.
-        subtrigger.invokeClear(subContext);
+        subTrigger.invokeClear(subContext);
       }
     }
+    context.trigger().setFinished(true);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/4f9820b1/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
index b9fbac3..1ce035a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
@@ -23,7 +23,6 @@ import org.apache.beam.runners.core.StateAccessor;
 import org.apache.beam.runners.core.StateMerging;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.state.CombiningState;
@@ -33,7 +32,7 @@ import org.apache.beam.sdk.transforms.Sum;
  * {@link TriggerStateMachine}s that fire based on properties of the elements in the current pane.
  */
 @Experimental(Experimental.Kind.TRIGGER)
-public class AfterPaneStateMachine extends OnceTriggerStateMachine {
+public class AfterPaneStateMachine extends TriggerStateMachine {
 
 private static final StateTag<CombiningState<Long, long[], Long>>
       ELEMENTS_IN_PANE_TAG =
@@ -130,7 +129,8 @@ private static final StateTag<CombiningState<Long, long[], Long>>
   }
 
   @Override
-  protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) throws Exception {
+  public void onFire(TriggerStateMachine.TriggerContext context) throws Exception {
     clear(context);
+    context.trigger().setFinished(true);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f9820b1/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
index c9eee15..509c96b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import com.google.common.collect.ImmutableList;
 import java.util.Objects;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.state.TimeDomain;
 
@@ -242,7 +241,7 @@ public class AfterWatermarkStateMachine {
   /**
    * A watermark trigger targeted relative to the end of the window.
    */
-  public static class FromEndOfWindow extends OnceTriggerStateMachine {
+  public static class FromEndOfWindow extends TriggerStateMachine {
 
     private FromEndOfWindow() {
       super(null);
@@ -319,6 +318,8 @@ public class AfterWatermarkStateMachine {
     }
 
     @Override
-    protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) throws Exception { }
+    public void onFire(TriggerStateMachine.TriggerContext context) throws Exception {
+      context.trigger().setFinished(true);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f9820b1/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java
index c4d89c2..cdcff64 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java
@@ -23,7 +23,6 @@ import static com.google.common.base.Preconditions.checkState;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 
 /**
@@ -46,17 +45,14 @@ public class ExecutableTriggerStateMachine implements Serializable {
 
   private static <W extends BoundedWindow> ExecutableTriggerStateMachine create(
       TriggerStateMachine trigger, int nextUnusedIndex) {
-    if (trigger instanceof OnceTriggerStateMachine) {
-      return new ExecutableOnceTriggerStateMachine(
-          (OnceTriggerStateMachine) trigger, nextUnusedIndex);
-    } else {
+
       return new ExecutableTriggerStateMachine(trigger, nextUnusedIndex);
-    }
+
   }
 
   public static <W extends BoundedWindow> ExecutableTriggerStateMachine createForOnceTrigger(
-      OnceTriggerStateMachine trigger, int nextUnusedIndex) {
-    return new ExecutableOnceTriggerStateMachine(trigger, nextUnusedIndex);
+      TriggerStateMachine trigger, int nextUnusedIndex) {
+    return new ExecutableTriggerStateMachine(trigger, nextUnusedIndex);
   }
 
   private ExecutableTriggerStateMachine(TriggerStateMachine trigger, int nextUnusedIndex) {
@@ -146,15 +142,4 @@ public class ExecutableTriggerStateMachine implements Serializable {
   public void invokeClear(TriggerStateMachine.TriggerContext c) throws Exception {
     trigger.clear(c.forTrigger(this));
   }
-
-  /**
-   * {@link ExecutableTriggerStateMachine} that enforces the fact that the trigger should always
-   * FIRE_AND_FINISH and never just FIRE.
-   */
-  private static class ExecutableOnceTriggerStateMachine extends ExecutableTriggerStateMachine {
-
-    public ExecutableOnceTriggerStateMachine(OnceTriggerStateMachine trigger, int nextUnusedIndex) {
-      super(trigger, nextUnusedIndex);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f9820b1/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java
index f32c7a8..f8c5e8b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.core.triggers;
 
-import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 
@@ -27,7 +26,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
  * <p>Using this trigger will only produce output when the watermark passes the end of the
  * {@link BoundedWindow window} plus the allowed lateness.
  */
-public final class NeverStateMachine extends OnceTriggerStateMachine {
+public final class NeverStateMachine extends TriggerStateMachine {
   /**
    * Returns a trigger which never fires. Output will be produced from the using {@link GroupByKey}
    * when the {@link BoundedWindow} closes.
@@ -53,7 +52,7 @@ public final class NeverStateMachine extends OnceTriggerStateMachine {
   }
 
   @Override
-  protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) {
+  public void onFire(TriggerStateMachine.TriggerContext context) {
     throw new UnsupportedOperationException(
         String.format("%s should never fire", getClass().getSimpleName()));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f9820b1/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java
index 6a2cf0c..880aa48 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java
@@ -453,35 +453,8 @@ public abstract class TriggerStateMachine implements Serializable {
    * }
    * </pre>
    *
-   * <p>Note that if {@code t1} is {@link OnceTriggerStateMachine}, then {@code t1.orFinally(t2)} is
-   * the same as {@code AfterFirst.of(t1, t2)}.
    */
   public TriggerStateMachine orFinally(TriggerStateMachine until) {
     return new OrFinallyStateMachine(this, until);
   }
-
-  /**
-   * {@link TriggerStateMachine}s that are guaranteed to fire at most once should extend from this,
-   * rather than the general {@link TriggerStateMachine} class to indicate that behavior.
-   */
-  public abstract static class OnceTriggerStateMachine extends TriggerStateMachine {
-    protected OnceTriggerStateMachine(List<TriggerStateMachine> subTriggers) {
-      super(subTriggers);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public final void onFire(TriggerContext context) throws Exception {
-      onOnlyFiring(context);
-      context.trigger().setFinished(true);
-    }
-
-    /**
-     * Called exactly once by {@link #onFire} when the trigger is fired. By default,
-     * invokes {@link #onFire} on all subtriggers for which {@link #shouldFire} is {@code true}.
-     */
-    protected abstract void onOnlyFiring(TriggerContext context) throws Exception;
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f9820b1/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java
index 453c8ff..2be90de 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.when;
 
-import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -42,8 +41,8 @@ import org.mockito.MockitoAnnotations;
 @RunWith(JUnit4.class)
 public class AfterFirstStateMachineTest {
 
-  @Mock private OnceTriggerStateMachine mockTrigger1;
-  @Mock private OnceTriggerStateMachine mockTrigger2;
+  @Mock private TriggerStateMachine mockTrigger1;
+  @Mock private TriggerStateMachine mockTrigger2;
   private SimpleTriggerStateMachineTester<IntervalWindow> tester;
   private static TriggerStateMachine.TriggerContext anyTriggerContext() {
     return Mockito.<TriggerStateMachine.TriggerContext>any();

http://git-wip-us.apache.org/repos/asf/beam/blob/4f9820b1/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
index e4d10a0..45a5cfb 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
@@ -25,7 +25,6 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnMergeContext;
-import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -46,8 +45,8 @@ import org.mockito.MockitoAnnotations;
 @RunWith(JUnit4.class)
 public class AfterWatermarkStateMachineTest {
 
-  @Mock private OnceTriggerStateMachine mockEarly;
-  @Mock private OnceTriggerStateMachine mockLate;
+  @Mock private TriggerStateMachine mockEarly;
+  @Mock private TriggerStateMachine mockLate;
 
   private SimpleTriggerStateMachineTester<IntervalWindow> tester;
   private static TriggerStateMachine.TriggerContext anyTriggerContext() {
@@ -70,7 +69,7 @@ public class AfterWatermarkStateMachineTest {
     MockitoAnnotations.initMocks(this);
   }
 
-  public void testRunningAsTrigger(OnceTriggerStateMachine mockTrigger, IntervalWindow window)
+  public void testRunningAsTrigger(TriggerStateMachine mockTrigger, IntervalWindow window)
       throws Exception {
 
     // Don't fire due to mock saying no

http://git-wip-us.apache.org/repos/asf/beam/blob/4f9820b1/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/StubTriggerStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/StubTriggerStateMachine.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/StubTriggerStateMachine.java
index 4512848..1bc757e 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/StubTriggerStateMachine.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/StubTriggerStateMachine.java
@@ -18,12 +18,11 @@
 package org.apache.beam.runners.core.triggers;
 
 import com.google.common.collect.Lists;
-import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 
 /**
- * No-op {@link OnceTriggerStateMachine} implementation for testing.
+ * No-op {@link TriggerStateMachine} implementation for testing.
  */
-abstract class StubTriggerStateMachine extends OnceTriggerStateMachine {
+abstract class StubTriggerStateMachine extends TriggerStateMachine {
   /**
    * Create a stub {@link TriggerStateMachine} instance which returns the specified name on {@link
    * #toString()}.
@@ -42,7 +41,7 @@ abstract class StubTriggerStateMachine extends OnceTriggerStateMachine {
   }
 
   @Override
-  protected void onOnlyFiring(TriggerContext context) throws Exception {
+  public void onFire(TriggerContext context) throws Exception {
   }
 
   @Override