You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/25 17:14:06 UTC

[4/5] incubator-beam git commit: Remove pieces of Trigger now owned by TriggerStateMachine

Remove pieces of Trigger now owned by TriggerStateMachine


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1eff320d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1eff320d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1eff320d

Branch: refs/heads/master
Commit: 1eff320d5e7fb5510d13016e0826b14e5cf7f686
Parents: dfaf2a8
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 24 12:57:37 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Oct 25 10:12:56 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/transforms/windowing/AfterAll.java |  49 --
 .../windowing/AfterDelayFromFirstElement.java   |  99 ----
 .../sdk/transforms/windowing/AfterEach.java     |  61 ---
 .../sdk/transforms/windowing/AfterFirst.java    |  50 --
 .../sdk/transforms/windowing/AfterPane.java     |  52 --
 .../windowing/AfterProcessingTime.java          |   7 -
 .../AfterSynchronizedProcessingTime.java        |   7 -
 .../transforms/windowing/AfterWatermark.java    | 158 ------
 .../transforms/windowing/DefaultTrigger.java    |  35 --
 .../beam/sdk/transforms/windowing/Never.java    |  17 -
 .../transforms/windowing/OrFinallyTrigger.java  |  46 --
 .../sdk/transforms/windowing/Repeatedly.java    |  30 --
 .../beam/sdk/transforms/windowing/Trigger.java  | 412 ++-------------
 .../apache/beam/sdk/util/ExecutableTrigger.java |  40 +-
 .../apache/beam/sdk/util/FinishedTriggers.java  |  44 --
 .../beam/sdk/util/FinishedTriggersBitSet.java   |  67 ---
 .../beam/sdk/util/FinishedTriggersSet.java      |  72 ---
 .../apache/beam/sdk/util/ReshuffleTrigger.java  |  14 -
 .../beam/sdk/util/TriggerContextFactory.java    | 507 -------------------
 .../sdk/transforms/windowing/AfterAllTest.java  |  98 ----
 .../sdk/transforms/windowing/AfterEachTest.java |  64 ---
 .../transforms/windowing/AfterFirstTest.java    | 120 -----
 .../sdk/transforms/windowing/AfterPaneTest.java |  77 ---
 .../windowing/AfterProcessingTimeTest.java      |  94 ----
 .../AfterSynchronizedProcessingTimeTest.java    |  75 ---
 .../windowing/AfterWatermarkTest.java           | 308 -----------
 .../windowing/DefaultTriggerTest.java           | 130 -----
 .../sdk/transforms/windowing/NeverTest.java     |  34 +-
 .../windowing/OrFinallyTriggerTest.java         | 136 -----
 .../transforms/windowing/RepeatedlyTest.java    | 161 +-----
 .../sdk/transforms/windowing/StubTrigger.java   |  17 -
 .../sdk/transforms/windowing/TriggerTest.java   |  28 -
 .../beam/sdk/util/ExecutableTriggerTest.java    |  18 -
 .../sdk/util/FinishedTriggersBitSetTest.java    |  55 --
 .../sdk/util/FinishedTriggersProperties.java    | 110 ----
 .../beam/sdk/util/FinishedTriggersSetTest.java  |  60 ---
 .../beam/sdk/util/ReshuffleTriggerTest.java     |  23 -
 .../org/apache/beam/sdk/util/TriggerTester.java | 410 ---------------
 38 files changed, 77 insertions(+), 3708 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
index 0e37d33..c3f0848 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
@@ -24,7 +24,6 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.ExecutableTrigger;
 import org.joda.time.Instant;
 
 /**
@@ -46,27 +45,6 @@ public class AfterAll extends OnceTrigger {
   }
 
   @Override
-  public void onElement(OnElementContext c) throws Exception {
-    for (ExecutableTrigger subTrigger : c.trigger().unfinishedSubTriggers()) {
-      // Since subTriggers are all OnceTriggers, they must either CONTINUE or FIRE_AND_FINISH.
-      // invokeElement will automatically mark the finish bit if they return FIRE_AND_FINISH.
-      subTrigger.invokeOnElement(c);
-    }
-  }
-
-  @Override
-  public void onMerge(OnMergeContext c) throws Exception {
-    for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
-      subTrigger.invokeOnMerge(c);
-    }
-    boolean allFinished = true;
-    for (ExecutableTrigger subTrigger1 : c.trigger().subTriggers()) {
-      allFinished &= c.forTrigger(subTrigger1).trigger().isFinished();
-    }
-    c.trigger().setFinished(allFinished);
-  }
-
-  @Override
   public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
     // This trigger will fire after the latest of its sub-triggers.
     Instant deadline = BoundedWindow.TIMESTAMP_MIN_VALUE;
@@ -84,33 +62,6 @@ public class AfterAll extends OnceTrigger {
     return new AfterAll(continuationTriggers);
   }
 
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true} if all subtriggers return {@code true}.
-   */
-  @Override
-  public boolean shouldFire(TriggerContext context) throws Exception {
-    for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
-      if (!context.forTrigger(subtrigger).trigger().isFinished()
-          && !subtrigger.invokeShouldFire(context)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Invokes {@link #onFire} for all subtriggers, eliding redundant calls to {@link #shouldFire}
-   * because they all must be ready to fire.
-   */
-  @Override
-  public void onOnlyFiring(TriggerContext context) throws Exception {
-    for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
-      subtrigger.invokeOnFire(context);
-    }
-  }
-
   @Override
   public String toString() {
     StringBuilder builder = new StringBuilder("AfterAll.of(");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
index 6078b34..9daecb2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
@@ -18,11 +18,9 @@
 package org.apache.beam.sdk.transforms.windowing;
 
 import com.google.common.collect.ImmutableList;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.List;
 import java.util.Locale;
 import java.util.Objects;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.transforms.Combine;
@@ -31,10 +29,6 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
-import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateMerging;
 import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.util.state.StateTags;
 import org.joda.time.Duration;
@@ -62,12 +56,6 @@ public abstract class AfterDelayFromFirstElement extends OnceTrigger {
   private static final PeriodFormatter PERIOD_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH);
 
   /**
-   * To complete an implementation, return the desired time from the TriggerContext.
-   */
-  @Nullable
-  public abstract Instant getCurrentTime(Trigger.TriggerContext context);
-
-  /**
    * To complete an implementation, return a new instance like this one, but incorporating
    * the provided timestamp mapping functions. Generally should be used by calling the
    * constructor of this class from the constructor of the subclass.
@@ -92,10 +80,6 @@ public abstract class AfterDelayFromFirstElement extends OnceTrigger {
     this.timeDomain = timeDomain;
   }
 
-  private Instant getTargetTimestamp(OnElementContext c) {
-    return computeTargetTimestamp(c.currentProcessingTime());
-  }
-
   /**
    * The time domain according for which this trigger sets timers.
    */
@@ -170,93 +154,10 @@ public abstract class AfterDelayFromFirstElement extends OnceTrigger {
   }
 
   @Override
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification =
-      "prefetch side effect")
-  public void prefetchOnElement(StateAccessor<?> state) {
-    state.access(DELAYED_UNTIL_TAG).readLater();
-  }
-
-  @Override
-  public void onElement(OnElementContext c) throws Exception {
-    CombiningState<Instant, Instant> delayUntilState = c.state().access(DELAYED_UNTIL_TAG);
-    Instant oldDelayUntil = delayUntilState.read();
-
-    // Since processing time can only advance, resulting in target wake-up times we would
-    // ignore anyhow, we don't bother with it if it is already set.
-    if (oldDelayUntil != null) {
-      return;
-    }
-
-    Instant targetTimestamp = getTargetTimestamp(c);
-    delayUntilState.add(targetTimestamp);
-    c.setTimer(targetTimestamp, timeDomain);
-  }
-
-  @Override
-  public void prefetchOnMerge(MergingStateAccessor<?, ?> state) {
-    super.prefetchOnMerge(state);
-    StateMerging.prefetchCombiningValues(state, DELAYED_UNTIL_TAG);
-  }
-
-  @Override
-  public void onMerge(OnMergeContext c) throws Exception {
-    // NOTE: We could try to delete all timers which are still active, but we would
-    // need access to a timer context for each merging window.
-    // for (CombiningValueStateInternal<Instant, Combine.Holder<Instant>, Instant> state :
-    //    c.state().accessInEachMergingWindow(DELAYED_UNTIL_TAG).values()) {
-    //   Instant timestamp = state.get().read();
-    //   if (timestamp != null) {
-    //     <context for merging window>.deleteTimer(timestamp, timeDomain);
-    //   }
-    // }
-    // Instead let them fire and be ignored.
-
-    // If the trigger is already finished, there is no way it will become re-activated
-    if (c.trigger().isFinished()) {
-      StateMerging.clear(c.state(), DELAYED_UNTIL_TAG);
-      // NOTE: We do not attempt to delete  the timers.
-      return;
-    }
-
-    // Determine the earliest point across all the windows, and delay to that.
-    StateMerging.mergeCombiningValues(c.state(), DELAYED_UNTIL_TAG);
-
-    Instant earliestTargetTime = c.state().access(DELAYED_UNTIL_TAG).read();
-    if (earliestTargetTime != null) {
-      c.setTimer(earliestTargetTime, timeDomain);
-    }
-  }
-
-  @Override
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification =
-      "prefetch side effect")
-  public void prefetchShouldFire(StateAccessor<?> state) {
-    state.access(DELAYED_UNTIL_TAG).readLater();
-  }
-
-  @Override
-  public void clear(TriggerContext c) throws Exception {
-    c.state().access(DELAYED_UNTIL_TAG).clear();
-  }
-
-  @Override
   public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
     return BoundedWindow.TIMESTAMP_MAX_VALUE;
   }
 
-  @Override
-  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
-    Instant delayedUntil = context.state().access(DELAYED_UNTIL_TAG).read();
-    return delayedUntil != null
-        && getCurrentTime(context) != null
-        && getCurrentTime(context).isAfter(delayedUntil);
-  }
-
-  @Override
-  protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception {
-    clear(context);
-  }
-
   protected Instant computeTargetTimestamp(Instant time) {
     Instant result = time;
     for (SerializableFunction<Instant, Instant> timestampMapper : timestampMappers) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
index 961d97f..872ad46 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
@@ -23,7 +23,6 @@ import com.google.common.base.Joiner;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.util.ExecutableTrigger;
 import org.joda.time.Instant;
 
 /**
@@ -59,41 +58,6 @@ public class AfterEach extends Trigger {
   }
 
   @Override
-  public void onElement(OnElementContext c) throws Exception {
-    if (!c.trigger().isMerging()) {
-      // If merges are not possible, we need only run the first unfinished subtrigger
-      c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c);
-    } else {
-      // If merges are possible, we need to run all subtriggers in parallel
-      for (ExecutableTrigger subTrigger :  c.trigger().subTriggers()) {
-        // Even if the subTrigger is done, it may be revived via merging and must have
-        // adequate state.
-        subTrigger.invokeOnElement(c);
-      }
-    }
-  }
-
-  @Override
-  public void onMerge(OnMergeContext context) throws Exception {
-    // If merging makes a subtrigger no-longer-finished, it will automatically
-    // begin participating in shouldFire and onFire appropriately.
-
-    // All the following triggers are retroactively "not started" but that is
-    // also automatic because they are cleared whenever this trigger
-    // fires.
-    boolean priorTriggersAllFinished = true;
-    for (ExecutableTrigger subTrigger : context.trigger().subTriggers()) {
-      if (priorTriggersAllFinished) {
-        subTrigger.invokeOnMerge(context);
-        priorTriggersAllFinished &= context.forTrigger(subTrigger).trigger().isFinished();
-      } else {
-        subTrigger.invokeClear(context);
-      }
-    }
-    updateFinishedState(context);
-  }
-
-  @Override
   public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
     // This trigger will fire at least once when the first trigger in the sequence
     // fires at least once.
@@ -106,27 +70,6 @@ public class AfterEach extends Trigger {
   }
 
   @Override
-  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
-    ExecutableTrigger firstUnfinished = context.trigger().firstUnfinishedSubTrigger();
-    return firstUnfinished.invokeShouldFire(context);
-  }
-
-  @Override
-  public void onFire(Trigger.TriggerContext context) throws Exception {
-    context.trigger().firstUnfinishedSubTrigger().invokeOnFire(context);
-
-    // Reset all subtriggers if in a merging context; any may be revived by merging so they are
-    // all run in parallel for each pending pane.
-    if (context.trigger().isMerging()) {
-      for (ExecutableTrigger subTrigger : context.trigger().subTriggers()) {
-        subTrigger.invokeClear(context);
-      }
-    }
-
-    updateFinishedState(context);
-  }
-
-  @Override
   public String toString() {
     StringBuilder builder = new StringBuilder("AfterEach.inOrder(");
     Joiner.on(", ").appendTo(builder, subTriggers);
@@ -134,8 +77,4 @@ public class AfterEach extends Trigger {
 
     return builder.toString();
   }
-
-  private void updateFinishedState(TriggerContext context) {
-    context.trigger().setFinished(context.trigger().firstUnfinishedSubTrigger() == null);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
index 7840fc4..a742b43 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
@@ -24,7 +24,6 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.ExecutableTrigger;
 import org.joda.time.Instant;
 
 /**
@@ -47,21 +46,6 @@ public class AfterFirst extends OnceTrigger {
   }
 
   @Override
-  public void onElement(OnElementContext c) throws Exception {
-    for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
-      subTrigger.invokeOnElement(c);
-    }
-  }
-
-  @Override
-  public void onMerge(OnMergeContext c) throws Exception {
-    for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
-      subTrigger.invokeOnMerge(c);
-    }
-    updateFinishedStatus(c);
-  }
-
-  @Override
   public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
     // This trigger will fire after the earliest of its sub-triggers.
     Instant deadline = BoundedWindow.TIMESTAMP_MAX_VALUE;
@@ -80,32 +64,6 @@ public class AfterFirst extends OnceTrigger {
   }
 
   @Override
-  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
-    for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
-      if (context.forTrigger(subtrigger).trigger().isFinished()
-          || subtrigger.invokeShouldFire(context)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  @Override
-  protected void onOnlyFiring(TriggerContext context) throws Exception {
-    for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
-      TriggerContext subContext = context.forTrigger(subtrigger);
-      if (subtrigger.invokeShouldFire(subContext)) {
-        // If the trigger is ready to fire, then do whatever it needs to do.
-        subtrigger.invokeOnFire(subContext);
-      } else {
-        // If the trigger is not ready to fire, it is nonetheless true that whatever
-        // pending pane it was tracking is now gone.
-        subtrigger.invokeClear(subContext);
-      }
-    }
-  }
-
-  @Override
   public String toString() {
     StringBuilder builder = new StringBuilder("AfterFirst.of(");
     Joiner.on(", ").appendTo(builder, subTriggers);
@@ -113,12 +71,4 @@ public class AfterFirst extends OnceTrigger {
 
     return builder.toString();
   }
-
-  private void updateFinishedStatus(TriggerContext c) {
-    boolean anyFinished = false;
-    for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
-      anyFinished |= c.forTrigger(subTrigger).trigger().isFinished();
-    }
-    c.trigger().setFinished(anyFinished);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
index 4d59d58..4a706e6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.transforms.windowing;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.List;
 import java.util.Objects;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -25,9 +24,6 @@ import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateMerging;
 import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.util.state.StateTags;
 import org.joda.time.Instant;
@@ -65,49 +61,6 @@ private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Lo
   }
 
   @Override
-  public void onElement(OnElementContext c) throws Exception {
-    c.state().access(ELEMENTS_IN_PANE_TAG).add(1L);
-  }
-
-  @Override
-  public void prefetchOnMerge(MergingStateAccessor<?, ?> state) {
-    super.prefetchOnMerge(state);
-    StateMerging.prefetchCombiningValues(state, ELEMENTS_IN_PANE_TAG);
-  }
-
-  @Override
-  public void onMerge(OnMergeContext context) throws Exception {
-    // If we've already received enough elements and finished in some window,
-    // then this trigger is just finished.
-    if (context.trigger().finishedInAnyMergingWindow()) {
-      context.trigger().setFinished(true);
-      StateMerging.clear(context.state(), ELEMENTS_IN_PANE_TAG);
-      return;
-    }
-
-    // Otherwise, compute the sum of elements in all the active panes.
-    StateMerging.mergeCombiningValues(context.state(), ELEMENTS_IN_PANE_TAG);
-  }
-
-  @Override
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification =
-      "prefetch side effect")
-  public void prefetchShouldFire(StateAccessor<?> state) {
-    state.access(ELEMENTS_IN_PANE_TAG).readLater();
-  }
-
-  @Override
-  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
-    long count = context.state().access(ELEMENTS_IN_PANE_TAG).read();
-    return count >= countElems;
-  }
-
-  @Override
-  public void clear(TriggerContext c) throws Exception {
-    c.state().access(ELEMENTS_IN_PANE_TAG).clear();
-  }
-
-  @Override
   public boolean isCompatible(Trigger other) {
     return this.equals(other);
   }
@@ -143,9 +96,4 @@ private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Lo
   public int hashCode() {
     return Objects.hash(countElems);
   }
-
-  @Override
-  protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception {
-    clear(context);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
index f551118..09f288e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.transforms.windowing;
 
 import java.util.List;
 import java.util.Objects;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.TimeDomain;
@@ -36,12 +35,6 @@ import org.joda.time.Instant;
 @Experimental(Experimental.Kind.TRIGGER)
 public class AfterProcessingTime extends AfterDelayFromFirstElement {
 
-  @Override
-  @Nullable
-  public Instant getCurrentTime(Trigger.TriggerContext context) {
-    return context.currentProcessingTime();
-  }
-
   private AfterProcessingTime(List<SerializableFunction<Instant, Instant>> transforms) {
     super(TimeDomain.PROCESSING_TIME, transforms);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
index b96b293..b6258f8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.transforms.windowing;
 import com.google.common.base.Objects;
 import java.util.Collections;
 import java.util.List;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.joda.time.Instant;
@@ -31,12 +30,6 @@ import org.joda.time.Instant;
  */
 public class AfterSynchronizedProcessingTime extends AfterDelayFromFirstElement {
 
-  @Override
-  @Nullable
-  public Instant getCurrentTime(Trigger.TriggerContext context) {
-    return context.currentSynchronizedProcessingTime();
-  }
-
   public AfterSynchronizedProcessingTime() {
     super(TimeDomain.SYNCHRONIZED_PROCESSING_TIME,
         Collections.<SerializableFunction<Instant, Instant>>emptyList());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
index 89c1ba9..37b73a6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
@@ -25,7 +25,6 @@ import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.ExecutableTrigger;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.joda.time.Instant;
 
@@ -111,50 +110,6 @@ public class AfterWatermark {
     }
 
     @Override
-    public void onElement(OnElementContext c) throws Exception {
-      if (!c.trigger().isMerging()) {
-        // If merges can never happen, we just run the unfinished subtrigger
-        c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c);
-      } else {
-        // If merges can happen, we run for all subtriggers because they might be
-        // de-activated or re-activated
-        for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
-          subTrigger.invokeOnElement(c);
-        }
-      }
-    }
-
-    @Override
-    public void onMerge(OnMergeContext c) throws Exception {
-      // NOTE that the ReduceFnRunner will delete all end-of-window timers for the
-      // merged-away windows.
-
-      ExecutableTrigger earlySubtrigger = c.trigger().subTrigger(EARLY_INDEX);
-      // We check the early trigger to determine if we are still processing it or
-      // if the end of window has transitioned us to the late trigger
-      OnMergeContext earlyContext = c.forTrigger(earlySubtrigger);
-
-      // If the early trigger is still active in any merging window then it is still active in
-      // the new merged window, because even if the merged window is "done" some pending elements
-      // haven't had a chance to fire.
-      if (!earlyContext.trigger().finishedInAllMergingWindows() || !endOfWindowReached(c)) {
-        earlyContext.trigger().setFinished(false);
-        if (lateTrigger != null) {
-          ExecutableTrigger lateSubtrigger = c.trigger().subTrigger(LATE_INDEX);
-          OnMergeContext lateContext = c.forTrigger(lateSubtrigger);
-          lateContext.trigger().setFinished(false);
-          lateSubtrigger.invokeClear(lateContext);
-        }
-      } else {
-        // Otherwise the early trigger and end-of-window bit is done for good.
-        earlyContext.trigger().setFinished(true);
-        if (lateTrigger != null) {
-          c.trigger().subTrigger(LATE_INDEX).invokeOnMerge(c);
-        }
-      }
-    }
-
-    @Override
     public Trigger getContinuationTrigger() {
       return new AfterWatermarkEarlyAndLate(
           earlyTrigger.getContinuationTrigger(),
@@ -173,38 +128,6 @@ public class AfterWatermark {
       return window.maxTimestamp();
     }
 
-    private boolean endOfWindowReached(Trigger.TriggerContext context) {
-      return context.currentEventTime() != null
-          && context.currentEventTime().isAfter(context.window().maxTimestamp());
-    }
-
-    @Override
-    public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
-      if (!context.trigger().isFinished(EARLY_INDEX)) {
-        // We have not yet transitioned to late firings.
-        // We should fire if either the trigger is ready or we reach the end of the window.
-        return context.trigger().subTrigger(EARLY_INDEX).invokeShouldFire(context)
-            || endOfWindowReached(context);
-      } else if (lateTrigger == null) {
-        return false;
-      } else {
-        // We are running the late trigger
-        return context.trigger().subTrigger(LATE_INDEX).invokeShouldFire(context);
-      }
-    }
-
-    @Override
-    public void onFire(Trigger.TriggerContext context) throws Exception {
-      if (!context.forTrigger(context.trigger().subTrigger(EARLY_INDEX)).trigger().isFinished()) {
-        onNonLateFiring(context);
-      } else if (lateTrigger != null) {
-        onLateFiring(context);
-      } else {
-        // all done
-        context.trigger().setFinished(true);
-      }
-    }
-
     @Override
     public String toString() {
       StringBuilder builder = new StringBuilder(TO_STRING);
@@ -225,47 +148,6 @@ public class AfterWatermark {
 
       return builder.toString();
     }
-
-    private void onNonLateFiring(Trigger.TriggerContext context) throws Exception {
-      // We have not yet transitioned to late firings.
-      ExecutableTrigger earlySubtrigger = context.trigger().subTrigger(EARLY_INDEX);
-      Trigger.TriggerContext earlyContext = context.forTrigger(earlySubtrigger);
-
-      if (!endOfWindowReached(context)) {
-        // This is an early firing, since we have not arrived at the end of the window
-        // Implicitly repeats
-        earlySubtrigger.invokeOnFire(context);
-        earlySubtrigger.invokeClear(context);
-        earlyContext.trigger().setFinished(false);
-      } else {
-        // We have arrived at the end of the window; terminate the early trigger
-        // and clear out the late trigger's state
-        if (earlySubtrigger.invokeShouldFire(context)) {
-          earlySubtrigger.invokeOnFire(context);
-        }
-        earlyContext.trigger().setFinished(true);
-        earlySubtrigger.invokeClear(context);
-
-        if (lateTrigger == null) {
-          // Done if there is no late trigger.
-          context.trigger().setFinished(true);
-        } else {
-          // If there is a late trigger, we transition to it, and need to clear its state
-          // because it was run in parallel.
-          context.trigger().subTrigger(LATE_INDEX).invokeClear(context);
-        }
-      }
-
-    }
-
-    private void onLateFiring(Trigger.TriggerContext context) throws Exception {
-      // We are firing the late trigger, with implicit repeat
-      ExecutableTrigger lateSubtrigger = context.trigger().subTrigger(LATE_INDEX);
-      lateSubtrigger.invokeOnFire(context);
-      // It is a OnceTrigger, so it must have finished; unfinished it and clear it
-      lateSubtrigger.invokeClear(context);
-      context.forTrigger(lateSubtrigger).trigger().setFinished(false);
-    }
   }
 
   /**
@@ -296,33 +178,6 @@ public class AfterWatermark {
     }
 
     @Override
-    public void onElement(OnElementContext c) throws Exception {
-      // We're interested in knowing when the input watermark passes the end of the window.
-      // (It is possible this has already happened, in which case the timer will be fired
-      // almost immediately).
-      c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
-    }
-
-    @Override
-    public void onMerge(OnMergeContext c) throws Exception {
-      // NOTE that the ReduceFnRunner will delete all end-of-window timers for the
-      // merged-away windows.
-
-      if (!c.trigger().finishedInAllMergingWindows()) {
-        // If the trigger is still active in any merging window then it is still active in the new
-        // merged window, because even if the merged window is "done" some pending elements haven't
-        // had a chance to fire
-        c.trigger().setFinished(false);
-      } else if (!endOfWindowReached(c)) {
-        // If the end of the new window has not been reached, then the trigger is active again.
-        c.trigger().setFinished(false);
-      } else {
-        // Otherwise it is done for good
-        c.trigger().setFinished(true);
-      }
-    }
-
-    @Override
     public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
       return window.maxTimestamp();
     }
@@ -346,18 +201,5 @@ public class AfterWatermark {
     public int hashCode() {
       return Objects.hash(getClass());
     }
-
-    @Override
-    public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
-      return endOfWindowReached(context);
-    }
-
-    private boolean endOfWindowReached(Trigger.TriggerContext context) {
-      return context.currentEventTime() != null
-          && context.currentEventTime().isAfter(context.window().maxTimestamp());
-    }
-
-    @Override
-    protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception { }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
index fee7cdf..a649b4f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.transforms.windowing;
 
 import java.util.List;
 import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.util.TimeDomain;
 import org.joda.time.Instant;
 
 /**
@@ -41,27 +40,6 @@ public class DefaultTrigger extends Trigger{
   }
 
   @Override
-  public void onElement(OnElementContext c) throws Exception {
-    // If the end of the window has already been reached, then we are already ready to fire
-    // and do not need to set a wake-up timer.
-    if (!endOfWindowReached(c)) {
-      c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
-    }
-  }
-
-  @Override
-  public void onMerge(OnMergeContext c) throws Exception {
-    // If the end of the window has already been reached, then we are already ready to fire
-    // and do not need to set a wake-up timer.
-    if (!endOfWindowReached(c)) {
-      c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
-    }
-  }
-
-  @Override
-  public void clear(TriggerContext c) throws Exception { }
-
-  @Override
   public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
     return window.maxTimestamp();
   }
@@ -76,17 +54,4 @@ public class DefaultTrigger extends Trigger{
   public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
     return this;
   }
-
-  @Override
-  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
-    return endOfWindowReached(context);
-  }
-
-  private boolean endOfWindowReached(Trigger.TriggerContext context) {
-    return context.currentEventTime() != null
-        && context.currentEventTime().isAfter(context.window().maxTimestamp());
-  }
-
-  @Override
-  public void onFire(Trigger.TriggerContext context) throws Exception { }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
index 07b70f4..664ae83 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
@@ -48,12 +48,6 @@ public final class Never {
     }
 
     @Override
-    public void onElement(OnElementContext c) {}
-
-    @Override
-    public void onMerge(OnMergeContext c) {}
-
-    @Override
     protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
       return this;
     }
@@ -62,16 +56,5 @@ public final class Never {
     public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
       return BoundedWindow.TIMESTAMP_MAX_VALUE;
     }
-
-    @Override
-    public boolean shouldFire(Trigger.TriggerContext context) {
-      return false;
-    }
-
-    @Override
-    protected void onOnlyFiring(Trigger.TriggerContext context) {
-      throw new UnsupportedOperationException(
-          String.format("%s should never fire", getClass().getSimpleName()));
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
index 9bef45a..1ed9b55 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.transforms.windowing;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.sdk.util.ExecutableTrigger;
 import org.joda.time.Instant;
 
 /**
@@ -51,20 +50,6 @@ public class OrFinallyTrigger extends Trigger {
   }
 
   @Override
-  public void onElement(OnElementContext c) throws Exception {
-    c.trigger().subTrigger(ACTUAL).invokeOnElement(c);
-    c.trigger().subTrigger(UNTIL).invokeOnElement(c);
-  }
-
-  @Override
-  public void onMerge(OnMergeContext c) throws Exception {
-    for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
-      subTrigger.invokeOnMerge(c);
-    }
-    updateFinishedState(c);
-  }
-
-  @Override
   public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
     // This trigger fires once either the trigger or the until trigger fires.
     Instant actualDeadline = subTriggers.get(ACTUAL).getWatermarkThatGuaranteesFiring(window);
@@ -83,38 +68,7 @@ public class OrFinallyTrigger extends Trigger {
   }
 
   @Override
-  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
-    return context.trigger().subTrigger(ACTUAL).invokeShouldFire(context)
-        || context.trigger().subTrigger(UNTIL).invokeShouldFire(context);
-  }
-
-  @Override
-  public void onFire(Trigger.TriggerContext context) throws Exception {
-    ExecutableTrigger actualSubtrigger = context.trigger().subTrigger(ACTUAL);
-    ExecutableTrigger untilSubtrigger = context.trigger().subTrigger(UNTIL);
-
-    if (untilSubtrigger.invokeShouldFire(context)) {
-      untilSubtrigger.invokeOnFire(context);
-      actualSubtrigger.invokeClear(context);
-    } else {
-      // If until didn't fire, then the actual must have (or it is forbidden to call
-      // onFire) so we are done only if actual is done.
-      actualSubtrigger.invokeOnFire(context);
-      // Do not clear the until trigger, because it tracks data cross firings.
-    }
-    updateFinishedState(context);
-  }
-
-  @Override
   public String toString() {
     return String.format("%s.orFinally(%s)", subTriggers.get(ACTUAL), subTriggers.get(UNTIL));
   }
-
-  private void updateFinishedState(TriggerContext c) throws Exception {
-    boolean anyStillFinished = false;
-    for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
-      anyStillFinished |= c.forTrigger(subTrigger).trigger().isFinished();
-    }
-    c.trigger().setFinished(anyStillFinished);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
index 45bc6c1..4d79a2c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.transforms.windowing;
 
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.sdk.util.ExecutableTrigger;
 import org.joda.time.Instant;
 
 /**
@@ -61,16 +60,6 @@ public class Repeatedly extends Trigger {
   }
 
   @Override
-  public void onElement(OnElementContext c) throws Exception {
-    getRepeated(c).invokeOnElement(c);
-  }
-
-  @Override
-  public void onMerge(OnMergeContext c) throws Exception {
-    getRepeated(c).invokeOnMerge(c);
-  }
-
-  @Override
   public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
     // This trigger fires once the repeated trigger fires.
     return subTriggers.get(REPEATED).getWatermarkThatGuaranteesFiring(window);
@@ -82,26 +71,7 @@ public class Repeatedly extends Trigger {
   }
 
   @Override
-  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
-    return getRepeated(context).invokeShouldFire(context);
-  }
-
-  @Override
-  public void onFire(TriggerContext context) throws Exception {
-    getRepeated(context).invokeOnFire(context);
-
-    if (context.trigger().isFinished(REPEATED)) {
-      // Reset tree will recursively clear the finished bits, and invoke clear.
-      context.forTrigger(getRepeated(context)).trigger().resetTree();
-    }
-  }
-
-  @Override
   public String toString() {
     return String.format("Repeatedly.forever(%s)", subTriggers.get(REPEATED));
   }
-
-  private ExecutableTrigger getRepeated(TriggerContext context) {
-    return context.trigger().subTrigger(REPEATED);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
index 18b7a62..1cc807e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
@@ -23,22 +23,18 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.util.ExecutableTrigger;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.transforms.GroupByKey;
 import org.joda.time.Instant;
 
 /**
- * {@code Trigger}s control when the elements for a specific key and window are output. As elements
- * arrive, they are put into one or more windows by a {@link Window} transform and its associated
- * {@link WindowFn}, and then passed to the associated {@code Trigger} to determine if the
- * {@code Window}s contents should be output.
+ * {@link Trigger Triggers} control when the elements for a specific key and window are output. As
+ * elements arrive, they are put into one or more windows by a {@link Window} transform and its
+ * associated {@link WindowFn}, and then passed to the associated {@link Trigger} to determine if
+ * the {@link BoundedWindow Window's} contents should be output.
  *
- * <p>See {@link org.apache.beam.sdk.transforms.GroupByKey} and {@link Window}
- * for more information about how grouping with windows works.
+ * <p>See {@link GroupByKey} and {@link Window} for more information about how grouping with windows
+ * works.
  *
  * <p>The elements that are assigned to a window since the last time it was fired (or since the
  * window was created) are placed into the current window pane. Triggers are evaluated against the
@@ -46,224 +42,34 @@ import org.joda.time.Instant;
  * output. When the root trigger finishes (indicating it will never fire again), the window is
  * closed and any new elements assigned to that window are discarded.
  *
- * <p>Several predefined {@code Trigger}s are provided:
+ * <p>Several predefined {@link Trigger Triggers} are provided:
+ *
  * <ul>
- *   <li> {@link AfterWatermark} for firing when the watermark passes a timestamp determined from
- *   either the end of the window or the arrival of the first element in a pane.
- *   <li> {@link AfterProcessingTime} for firing after some amount of processing time has elapsed
- *   (typically since the first element in a pane).
- *   <li> {@link AfterPane} for firing off a property of the elements in the current pane, such as
- *   the number of elements that have been assigned to the current pane.
+ * <li> {@link AfterWatermark} for firing when the watermark passes a timestamp determined from
+ *     either the end of the window or the arrival of the first element in a pane.
+ * <li> {@link AfterProcessingTime} for firing after some amount of processing time has elapsed
+ *     (typically since the first element in a pane).
+ * <li> {@link AfterPane} for firing off a property of the elements in the current pane, such as the
+ *     number of elements that have been assigned to the current pane.
  * </ul>
  *
  * <p>In addition, {@code Trigger}s can be combined in a variety of ways:
- * <ul>
- *   <li> {@link Repeatedly#forever} to create a trigger that executes forever. Any time its
- *   argument finishes it gets reset and starts over. Can be combined with
- *   {@link Trigger#orFinally} to specify a condition that causes the repetition to stop.
- *   <li> {@link AfterEach#inOrder} to execute each trigger in sequence, firing each (and every)
- *   time that a trigger fires, and advancing to the next trigger in the sequence when it finishes.
- *   <li> {@link AfterFirst#of} to create a trigger that fires after at least one of its arguments
- *   fires. An {@link AfterFirst} trigger finishes after it fires once.
- *   <li> {@link AfterAll#of} to create a trigger that fires after all least one of its arguments
- *   have fired at least once. An {@link AfterAll} trigger finishes after it fires once.
- * </ul>
  *
- * <p>Each trigger tree is instantiated per-key and per-window. Every trigger in the tree is in one
- * of the following states:
  * <ul>
- *   <li> Never Existed - before the trigger has started executing, there is no state associated
- *   with it anywhere in the system. A trigger moves to the executing state as soon as it
- *   processes in the current pane.
- *   <li> Executing - while the trigger is receiving items and may fire. While it is in this state,
- *   it may persist book-keeping information to persisted state, set timers, etc.
- *   <li> Finished - after a trigger finishes, all of its book-keeping data is cleaned up, and the
- *   system remembers only that it is finished. Entering this state causes us to discard any
- *   elements in the buffer for that window, as well.
+ * <li> {@link Repeatedly#forever} to create a trigger that executes forever. Any time its argument
+ *     finishes it gets reset and starts over. Can be combined with {@link Trigger#orFinally} to
+ *     specify a condition that causes the repetition to stop.
+ * <li> {@link AfterEach#inOrder} to execute each trigger in sequence, firing each (and every) time
+ *     that a trigger fires, and advancing to the next trigger in the sequence when it finishes.
+ * <li> {@link AfterFirst#of} to create a trigger that fires after at least one of its arguments
+ *     fires. An {@link AfterFirst} trigger finishes after it fires once.
+ * <li> {@link AfterAll#of} to create a trigger that fires after all least one of its arguments have
+ *     fired at least once. An {@link AfterAll} trigger finishes after it fires once.
  * </ul>
- *
- * <p>Once finished, a trigger cannot return itself back to an earlier state, however a composite
- * trigger could reset its sub-triggers.
- *
- * <p>Triggers should not build up any state internally since they may be recreated
- * between invocations of the callbacks. All important values should be persisted using
- * state before the callback returns.
  */
 @Experimental(Experimental.Kind.TRIGGER)
 public abstract class Trigger implements Serializable {
 
-  /**
-   * Interface for accessing information about the trigger being executed and other triggers in the
-   * same tree.
-   */
-  public interface TriggerInfo {
-
-    /**
-     * Returns true if the windowing strategy of the current {@code PCollection} is a merging
-     * WindowFn. If true, the trigger execution needs to keep enough information to support the
-     * possibility of {@link Trigger#onMerge} being called. If false, {@link Trigger#onMerge} will
-     * never be called.
-     */
-    boolean isMerging();
-
-    /**
-     * Access the executable versions of the sub-triggers of the current trigger.
-     */
-    Iterable<ExecutableTrigger> subTriggers();
-
-    /**
-     * Access the executable version of the specified sub-trigger.
-     */
-    ExecutableTrigger subTrigger(int subtriggerIndex);
-
-    /**
-     * Returns true if the current trigger is marked finished.
-     */
-    boolean isFinished();
-
-    /**
-     * Return true if the given subtrigger is marked finished.
-     */
-    boolean isFinished(int subtriggerIndex);
-
-    /**
-     * Returns true if all the sub-triggers of the current trigger are marked finished.
-     */
-    boolean areAllSubtriggersFinished();
-
-    /**
-     * Returns an iterable over the unfinished sub-triggers of the current trigger.
-     */
-    Iterable<ExecutableTrigger> unfinishedSubTriggers();
-
-    /**
-     * Returns the first unfinished sub-trigger.
-     */
-    ExecutableTrigger firstUnfinishedSubTrigger();
-
-    /**
-     * Clears all keyed state for triggers in the current sub-tree and unsets all the associated
-     * finished bits.
-     */
-    void resetTree() throws Exception;
-
-    /**
-     * Sets the finished bit for the current trigger.
-     */
-    void setFinished(boolean finished);
-
-    /**
-     * Sets the finished bit for the given sub-trigger.
-     */
-    void setFinished(boolean finished, int subTriggerIndex);
-  }
-
-  /**
-   * Interact with properties of the trigger being executed, with extensions to deal with the
-   * merging windows.
-   */
-  public interface MergingTriggerInfo extends TriggerInfo {
-
-    /** Return true if the trigger is finished in any window being merged. */
-    boolean finishedInAnyMergingWindow();
-
-    /** Return true if the trigger is finished in all windows being merged. */
-    boolean finishedInAllMergingWindows();
-  }
-
-  /**
-   * Information accessible to all operational hooks in this {@code Trigger}.
-   *
-   * <p>Used directly in {@link Trigger#shouldFire} and {@link Trigger#clear}, and
-   * extended with additional information in other methods.
-   */
-  public abstract class TriggerContext {
-
-    /** Returns the interface for accessing trigger info. */
-    public abstract TriggerInfo trigger();
-
-    /** Returns the interface for accessing persistent state. */
-    public abstract StateAccessor<?> state();
-
-    /** The window that the current context is executing in. */
-    public abstract BoundedWindow window();
-
-    /** Create a sub-context for the given sub-trigger. */
-    public abstract TriggerContext forTrigger(ExecutableTrigger trigger);
-
-    /**
-     * Removes the timer set in this trigger context for the given {@link Instant}
-     * and {@link TimeDomain}.
-     */
-    public abstract void deleteTimer(Instant timestamp, TimeDomain domain);
-
-    /** The current processing time. */
-    public abstract Instant currentProcessingTime();
-
-    /** The current synchronized upstream processing time or {@code null} if unknown. */
-    @Nullable
-    public abstract Instant currentSynchronizedProcessingTime();
-
-    /** The current event time for the input or {@code null} if unknown. */
-    @Nullable
-    public abstract Instant currentEventTime();
-  }
-
-  /**
-   * Extended {@link TriggerContext} containing information accessible to the {@link #onElement}
-   * operational hook.
-   */
-  public abstract class OnElementContext extends TriggerContext {
-    /** The event timestamp of the element currently being processed. */
-    public abstract Instant eventTimestamp();
-
-    /**
-     * Sets a timer to fire when the watermark or processing time is beyond the given timestamp.
-     * Timers are not guaranteed to fire immediately, but will be delivered at some time afterwards.
-     *
-     * <p>As with {@link #state}, timers are implicitly scoped to the current window. All
-     * timer firings for a window will be received, but the implementation should choose to ignore
-     * those that are not applicable.
-     *
-     * @param timestamp the time at which the trigger should be re-evaluated
-     * @param domain the domain that the {@code timestamp} applies to
-     */
-    public abstract void setTimer(Instant timestamp, TimeDomain domain);
-
-    /** Create an {@code OnElementContext} for executing the given trigger. */
-    @Override
-    public abstract OnElementContext forTrigger(ExecutableTrigger trigger);
-  }
-
-  /**
-   * Extended {@link TriggerContext} containing information accessible to the {@link #onMerge}
-   * operational hook.
-   */
-  public abstract class OnMergeContext extends TriggerContext {
-    /**
-     * Sets a timer to fire when the watermark or processing time is beyond the given timestamp.
-     * Timers are not guaranteed to fire immediately, but will be delivered at some time afterwards.
-     *
-     * <p>As with {@link #state}, timers are implicitly scoped to the current window. All
-     * timer firings for a window will be received, but the implementation should choose to ignore
-     * those that are not applicable.
-     *
-     * @param timestamp the time at which the trigger should be re-evaluated
-     * @param domain the domain that the {@code timestamp} applies to
-     */
-    public abstract void setTimer(Instant timestamp, TimeDomain domain);
-
-    /** Create an {@code OnMergeContext} for executing the given trigger. */
-    @Override
-    public abstract OnMergeContext forTrigger(ExecutableTrigger trigger);
-
-    @Override
-    public abstract MergingStateAccessor<?, ?> state();
-
-    @Override
-    public abstract MergingTriggerInfo trigger();
-  }
-
   protected final List<Trigger> subTriggers;
 
   protected Trigger(List<Trigger> subTriggers) {
@@ -274,114 +80,14 @@ public abstract class Trigger implements Serializable {
     this(Collections.EMPTY_LIST);
   }
 
-  /**
-   * Called every time an element is incorporated into a window.
-   */
-  public abstract void onElement(OnElementContext c) throws Exception;
-
-  /**
-   * Called immediately after windows have been merged.
-   *
-   * <p>Leaf triggers should update their state by inspecting their status and any state
-   * in the merging windows. Composite triggers should update their state by calling
-   * {@link ExecutableTrigger#invokeOnMerge} on their sub-triggers, and applying appropriate logic.
-   *
-   * <p>A trigger such as {@link AfterWatermark#pastEndOfWindow} may no longer be finished;
-   * it is the responsibility of the trigger itself to record this fact. It is forbidden for
-   * a trigger to become finished due to {@link #onMerge}, as it has not yet fired the pending
-   * elements that led to it being ready to fire.
-   *
-   * <p>The implementation does not need to clear out any state associated with the old windows.
-   */
-  public abstract void onMerge(OnMergeContext c) throws Exception;
-
-  /**
-   * Returns {@code true} if the current state of the trigger indicates that its condition
-   * is satisfied and it is ready to fire.
-   */
-  public abstract boolean shouldFire(TriggerContext context) throws Exception;
-
-  /**
-   * Adjusts the state of the trigger to be ready for the next pane. For example, a
-   * {@link Repeatedly} trigger will reset its inner trigger, since it has fired.
-   *
-   * <p>If the trigger is finished, it is the responsibility of the trigger itself to
-   * record that fact via the {@code context}.
-   */
-  public abstract void onFire(TriggerContext context) throws Exception;
-
-  /**
-   * Called to allow the trigger to prefetch any state it will likely need to read from during
-   * an {@link #onElement} call.
-   */
-  public void prefetchOnElement(StateAccessor<?> state) {
-    if (subTriggers != null) {
-      for (Trigger trigger : subTriggers) {
-        trigger.prefetchOnElement(state);
-      }
-    }
-  }
-
-  /**
-   * Called to allow the trigger to prefetch any state it will likely need to read from during
-   * an {@link #onMerge} call.
-   */
-  public void prefetchOnMerge(MergingStateAccessor<?, ?> state) {
-    if (subTriggers != null) {
-      for (Trigger trigger : subTriggers) {
-        trigger.prefetchOnMerge(state);
-      }
-    }
-  }
-
-  /**
-   * Called to allow the trigger to prefetch any state it will likely need to read from during
-   * an {@link #shouldFire} call.
-   */
-  public void prefetchShouldFire(StateAccessor<?> state) {
-    if (subTriggers != null) {
-      for (Trigger trigger : subTriggers) {
-        trigger.prefetchShouldFire(state);
-      }
-    }
-  }
-
-  /**
-   * Called to allow the trigger to prefetch any state it will likely need to read from during
-   * an {@link #onFire} call.
-   */
-  public void prefetchOnFire(StateAccessor<?> state) {
-    if (subTriggers != null) {
-      for (Trigger trigger : subTriggers) {
-        trigger.prefetchOnFire(state);
-      }
-    }
-  }
-
-  /**
-   * Clear any state associated with this trigger in the given window.
-   *
-   * <p>This is called after a trigger has indicated it will never fire again. The trigger system
-   * keeps enough information to know that the trigger is finished, so this trigger should clear all
-   * of its state.
-   */
-  public void clear(TriggerContext c) throws Exception {
-    if (subTriggers != null) {
-      for (ExecutableTrigger trigger : c.trigger().subTriggers()) {
-        trigger.invokeClear(c);
-      }
-    }
-  }
-
   public List<Trigger> subTriggers() {
     return subTriggers;
   }
 
   /**
-   * Return a trigger to use after a {@code GroupByKey} to preserve the
-   * intention of this trigger. Specifically, triggers that are time based
-   * and intended to provide speculative results should continue providing
-   * speculative results. Triggers that fire once (or multiple times) should
+   * Return a trigger to use after a {@link GroupByKey} to preserve the intention of this trigger.
+   * Specifically, triggers that are time based and intended to provide speculative results should
+   * continue providing speculative results. Triggers that fire once (or multiple times) should
    * continue firing once (or multiple times).
    */
   public Trigger getContinuationTrigger() {
@@ -397,27 +103,24 @@ public abstract class Trigger implements Serializable {
   }
 
   /**
-   * Return the {@link #getContinuationTrigger} of this {@code Trigger}. For convenience, this
-   * is provided the continuation trigger of each of the sub-triggers.
+   * Return the {@link #getContinuationTrigger} of this {@link Trigger}. For convenience, this is
+   * provided the continuation trigger of each of the sub-triggers.
    */
   protected abstract Trigger getContinuationTrigger(List<Trigger> continuationTriggers);
 
   /**
-   * Returns a bound in watermark time by which this trigger would have fired at least once
-   * for a given window had there been input data.  This is a static property of a trigger
-   * that does not depend on its state.
+   * Returns a bound in event time by which this trigger would have fired at least once for a given
+   * window had there been input data.
    *
-   * <p>For triggers that do not fire based on the watermark advancing, returns
-   * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
+   * <p>For triggers that do not fire based on the watermark advancing, returns {@link
+   * BoundedWindow#TIMESTAMP_MAX_VALUE}.
    *
-   * <p>This estimate is used to determine that there are no elements in a side-input window, which
-   * causes the default value to be used instead.
+   * <p>This estimate may be used, for example, to determine that there are no elements in a
+   * side-input window, which causes the default value to be used instead.
    */
   public abstract Instant getWatermarkThatGuaranteesFiring(BoundedWindow window);
 
-  /**
-   * Returns whether this performs the same triggering as the given {@code Trigger}.
-   */
+  /** Returns whether this performs the same triggering as the given {@link Trigger}. */
   public boolean isCompatible(Trigger other) {
     if (!getClass().equals(other.getClass())) {
       return false;
@@ -472,31 +175,33 @@ public abstract class Trigger implements Serializable {
   }
 
   /**
-   * Specify an ending condition for this trigger. If the {@code until} fires then the combination
-   * fires.
+   * Specify an ending condition for this trigger. If the {@code until} {@link Trigger} fires then
+   * the combination fires.
    *
-   * <p>The expression {@code t1.orFinally(t2)} fires every time {@code t1} fires, and finishes
-   * as soon as either {@code t1} finishes or {@code t2} fires, in which case it fires one last time
-   * for {@code t2}. Both {@code t1} and {@code t2} are executed in parallel. This means that
-   * {@code t1} may have fired since {@code t2} started, so not all of the elements that {@code t2}
-   * has seen are necessarily in the current pane.
+   * <p>The expression {@code t1.orFinally(t2)} fires every time {@code t1} fires, and finishes as
+   * soon as either {@code t1} finishes or {@code t2} fires, in which case it fires one last time
+   * for {@code t2}. Both {@code t1} and {@code t2} are executed in parallel. This means that {@code
+   * t1} may have fired since {@code t2} started, so not all of the elements that {@code t2} has
+   * seen are necessarily in the current pane.
    *
    * <p>For example the final firing of the following trigger may only have 1 element:
-   * <pre> {@code
+   *
+   * <pre>{@code
    * Repeatedly.forever(AfterPane.elementCountAtLeast(2))
    *     .orFinally(AfterPane.elementCountAtLeast(5))
-   * } </pre>
+   * }
+   * </pre>
    *
-   * <p>Note that if {@code t1} is {@link OnceTrigger}, then {@code t1.orFinally(t2)} is the same
-   * as {@code AfterFirst.of(t1, t2)}.
+   * <p>Note that if {@code t1} is {@link OnceTrigger}, then {@code t1.orFinally(t2)} is the same as
+   * {@code AfterFirst.of(t1, t2)}.
    */
   public OrFinallyTrigger orFinally(OnceTrigger until) {
     return new OrFinallyTrigger(this, until);
   }
 
   /**
-   * {@link Trigger}s that are guaranteed to fire at most once should extend from this, rather
-   * than the general {@link Trigger} class to indicate that behavior.
+   * {@link Trigger Triggers} that are guaranteed to fire at most once should extend {@link
+   * OnceTrigger} rather than the general {@link Trigger} class to indicate that behavior.
    */
   public abstract static class OnceTrigger extends Trigger {
     protected OnceTrigger(List<Trigger> subTriggers) {
@@ -511,20 +216,5 @@ public abstract class Trigger implements Serializable {
       }
       return (OnceTrigger) continuation;
     }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public final void onFire(TriggerContext context) throws Exception {
-      onOnlyFiring(context);
-      context.trigger().setFinished(true);
-    }
-
-    /**
-     * Called exactly once by {@link #onFire} when the trigger is fired. By default,
-     * invokes {@link #onFire} on all subtriggers for which {@link #shouldFire} is {@code true}.
-     */
-    protected abstract void onOnlyFiring(TriggerContext context) throws Exception;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java
index 088c499..48a49aa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java
@@ -29,9 +29,13 @@ import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
 
 /**
  * A wrapper around a trigger used during execution. While an actual trigger may appear multiple
- * times (both in the same trigger expression and in other trigger expressions), the
- * {@code ExecutableTrigger} wrapped around them forms a tree (only one occurrence).
+ * times (both in the same trigger expression and in other trigger expressions), the {@code
+ * ExecutableTrigger} wrapped around them forms a tree (only one occurrence).
+ *
+ * @deprecated uses of {@link ExecutableTrigger} should be ported to
+ *     org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine.
  */
+@Deprecated
 public class ExecutableTrigger implements Serializable {
 
   /** Store the index assigned to this trigger. */
@@ -115,38 +119,6 @@ public class ExecutableTrigger implements Serializable {
   }
 
   /**
-   * Invoke the {@link Trigger#onElement} method for this trigger, ensuring that the bits are
-   * properly updated if the trigger finishes.
-   */
-  public void invokeOnElement(Trigger.OnElementContext c) throws Exception {
-    trigger.onElement(c.forTrigger(this));
-  }
-
-  /**
-   * Invoke the {@link Trigger#onMerge} method for this trigger, ensuring that the bits are properly
-   * updated.
-   */
-  public void invokeOnMerge(Trigger.OnMergeContext c) throws Exception {
-    Trigger.OnMergeContext subContext = c.forTrigger(this);
-    trigger.onMerge(subContext);
-  }
-
-  public boolean invokeShouldFire(Trigger.TriggerContext c) throws Exception {
-    return trigger.shouldFire(c.forTrigger(this));
-  }
-
-  public void invokeOnFire(Trigger.TriggerContext c) throws Exception {
-    trigger.onFire(c.forTrigger(this));
-  }
-
-  /**
-   * Invoke clear for the current this trigger.
-   */
-  public void invokeClear(Trigger.TriggerContext c) throws Exception {
-    trigger.clear(c.forTrigger(this));
-  }
-
-  /**
    * {@link ExecutableTrigger} that enforces the fact that the trigger should always FIRE_AND_FINISH
    * and never just FIRE.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggers.java
deleted file mode 100644
index ea14c40..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggers.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-/**
- * A mutable set which tracks whether any particular {@link ExecutableTrigger} is
- * finished.
- */
-public interface FinishedTriggers {
-  /**
-   * Returns {@code true} if the trigger is finished.
-   */
-  boolean isFinished(ExecutableTrigger trigger);
-
-  /**
-   * Sets the fact that the trigger is finished.
-   */
-  void setFinished(ExecutableTrigger trigger, boolean value);
-
-  /**
-   * Sets the trigger and all of its subtriggers to unfinished.
-   */
-  void clearRecursively(ExecutableTrigger trigger);
-
-  /**
-   * Create an independent copy of this mutable {@link FinishedTriggers}.
-   */
-  FinishedTriggers copy();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersBitSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersBitSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersBitSet.java
deleted file mode 100644
index 4cd617f..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersBitSet.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.util.BitSet;
-
-/**
- * A {@link FinishedTriggers} implementation based on an underlying {@link BitSet}.
- */
-public class FinishedTriggersBitSet implements FinishedTriggers {
-
-  private final BitSet bitSet;
-
-  private FinishedTriggersBitSet(BitSet bitSet) {
-    this.bitSet = bitSet;
-  }
-
-  public static FinishedTriggersBitSet emptyWithCapacity(int capacity) {
-    return new FinishedTriggersBitSet(new BitSet(capacity));
-  }
-
-  public static FinishedTriggersBitSet fromBitSet(BitSet bitSet) {
-    return new FinishedTriggersBitSet(bitSet);
-  }
-
-  /**
-   * Returns the underlying {@link BitSet} for this {@link FinishedTriggersBitSet}.
-   */
-  public BitSet getBitSet() {
-    return bitSet;
-  }
-
-  @Override
-  public boolean isFinished(ExecutableTrigger trigger) {
-    return bitSet.get(trigger.getTriggerIndex());
-  }
-
-  @Override
-  public void setFinished(ExecutableTrigger trigger, boolean value) {
-    bitSet.set(trigger.getTriggerIndex(), value);
-  }
-
-  @Override
-  public void clearRecursively(ExecutableTrigger trigger) {
-    bitSet.clear(trigger.getTriggerIndex(), trigger.getFirstIndexAfterSubtree());
-  }
-
-  @Override
-  public FinishedTriggersBitSet copy() {
-    return new FinishedTriggersBitSet((BitSet) bitSet.clone());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersSet.java
deleted file mode 100644
index a9feb73..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersSet.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.common.collect.Sets;
-import java.util.Set;
-
-/**
- * An implementation of {@link FinishedTriggers} atop a user-provided mutable {@link Set}.
- */
-public class FinishedTriggersSet implements FinishedTriggers {
-
-  private final Set<ExecutableTrigger> finishedTriggers;
-
-  private FinishedTriggersSet(Set<ExecutableTrigger> finishedTriggers) {
-    this.finishedTriggers = finishedTriggers;
-  }
-
-  public static FinishedTriggersSet fromSet(Set<ExecutableTrigger> finishedTriggers) {
-    return new FinishedTriggersSet(finishedTriggers);
-  }
-
-  /**
-   * Returns a mutable {@link Set} of the underlying triggers that are finished.
-   */
-  public Set<ExecutableTrigger> getFinishedTriggers() {
-    return finishedTriggers;
-  }
-
-  @Override
-  public boolean isFinished(ExecutableTrigger trigger) {
-    return finishedTriggers.contains(trigger);
-  }
-
-  @Override
-  public void setFinished(ExecutableTrigger trigger, boolean value) {
-    if (value) {
-      finishedTriggers.add(trigger);
-    } else {
-      finishedTriggers.remove(trigger);
-    }
-  }
-
-  @Override
-  public void clearRecursively(ExecutableTrigger trigger) {
-    finishedTriggers.remove(trigger);
-    for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
-      clearRecursively(subTrigger);
-    }
-  }
-
-  @Override
-  public FinishedTriggersSet copy() {
-    return fromSet(Sets.newHashSet(finishedTriggers));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java
index 437f14a..8dd648a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java
@@ -35,12 +35,6 @@ public class ReshuffleTrigger<W extends BoundedWindow> extends Trigger {
   }
 
   @Override
-  public void onElement(Trigger.OnElementContext c) { }
-
-  @Override
-  public void onMerge(Trigger.OnMergeContext c) { }
-
-  @Override
   protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
     return this;
   }
@@ -52,14 +46,6 @@ public class ReshuffleTrigger<W extends BoundedWindow> extends Trigger {
   }
 
   @Override
-  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
-    return true;
-  }
-
-  @Override
-  public void onFire(Trigger.TriggerContext context) throws Exception { }
-
-  @Override
   public String toString() {
     return "ReshuffleTrigger()";
   }