You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/13 22:22:40 UTC

[06/17] incubator-beam git commit: Rename runners-core Trigger to TriggerStateMachine

Rename runners-core Trigger to TriggerStateMachine

This is a step in separating the syntax (in the SDK), semantics
(an abstract description), and implementation (runners-core state
machines that react to elements and timers) of triggers.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/69b1efda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/69b1efda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/69b1efda

Branch: refs/heads/master
Commit: 69b1efda02c66368ebb7447389d995e5c6975d43
Parents: e4398e1
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 23 20:31:10 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 13 14:34:34 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/core/reactors/AfterAll.java    | 122 -----
 .../reactors/AfterDelayFromFirstElement.java    | 335 ------------
 .../beam/runners/core/reactors/AfterEach.java   | 141 -----
 .../beam/runners/core/reactors/AfterFirst.java  | 124 -----
 .../beam/runners/core/reactors/AfterPane.java   | 144 -----
 .../core/reactors/AfterProcessingTime.java      | 102 ----
 .../AfterSynchronizedProcessingTime.java        |  73 ---
 .../runners/core/reactors/AfterWatermark.java   | 355 -------------
 .../runners/core/reactors/DefaultTrigger.java   |  92 ----
 .../core/reactors/ExecutableTrigger.java        | 159 ------
 .../runners/core/reactors/FinishedTriggers.java |  44 --
 .../core/reactors/FinishedTriggersBitSet.java   |  67 ---
 .../core/reactors/FinishedTriggersSet.java      |  72 ---
 .../beam/runners/core/reactors/Never.java       |  75 ---
 .../runners/core/reactors/OrFinallyTrigger.java | 105 ----
 .../beam/runners/core/reactors/Repeatedly.java  | 101 ----
 .../runners/core/reactors/ReshuffleTrigger.java |  66 ---
 .../beam/runners/core/reactors/Trigger.java     | 527 -------------------
 .../core/reactors/TriggerContextFactory.java    | 507 ------------------
 .../runners/core/reactors/TriggerRunner.java    | 247 ---------
 .../core/triggers/AfterAllStateMachine.java     | 104 ++++
 .../AfterDelayFromFirstElementStateMachine.java | 322 +++++++++++
 .../core/triggers/AfterEachStateMachine.java    | 125 +++++
 .../core/triggers/AfterFirstStateMachine.java   | 106 ++++
 .../core/triggers/AfterPaneStateMachine.java    | 132 +++++
 .../AfterProcessingTimeStateMachine.java        |  93 ++++
 ...rSynchronizedProcessingTimeStateMachine.java |  63 +++
 .../triggers/AfterWatermarkStateMachine.java    | 325 ++++++++++++
 .../triggers/DefaultTriggerStateMachine.java    |  81 +++
 .../triggers/ExecutableTriggerStateMachine.java | 160 ++++++
 .../runners/core/triggers/FinishedTriggers.java |  44 ++
 .../core/triggers/FinishedTriggersBitSet.java   |  67 +++
 .../core/triggers/FinishedTriggersSet.java      |  72 +++
 .../core/triggers/NeverStateMachine.java        |  60 +++
 .../core/triggers/OrFinallyStateMachine.java    |  85 +++
 .../core/triggers/RepeatedlyStateMachine.java   |  88 ++++
 .../triggers/ReshuffleTriggerStateMachine.java  |  50 ++
 .../core/triggers/TriggerStateMachine.java      | 487 +++++++++++++++++
 .../TriggerStateMachineContextFactory.java      | 509 ++++++++++++++++++
 .../triggers/TriggerStateMachineRunner.java     | 234 ++++++++
 .../runners/core/triggers/package-info.java     |  23 +
 .../runners/core/reactors/AfterAllTest.java     | 156 ------
 .../runners/core/reactors/AfterEachTest.java    | 132 -----
 .../runners/core/reactors/AfterFirstTest.java   | 181 -------
 .../runners/core/reactors/AfterPaneTest.java    | 132 -----
 .../core/reactors/AfterProcessingTimeTest.java  | 187 -------
 .../AfterSynchronizedProcessingTimeTest.java    | 121 -----
 .../core/reactors/AfterWatermarkTest.java       | 380 -------------
 .../core/reactors/DefaultTriggerTest.java       | 176 -------
 .../core/reactors/ExecutableTriggerTest.java    | 127 -----
 .../reactors/FinishedTriggersBitSetTest.java    |  55 --
 .../reactors/FinishedTriggersProperties.java    | 110 ----
 .../core/reactors/FinishedTriggersSetTest.java  |  60 ---
 .../beam/runners/core/reactors/NeverTest.java   |  56 --
 .../core/reactors/OrFinallyTriggerTest.java     | 215 --------
 .../runners/core/reactors/RepeatedlyTest.java   | 224 --------
 .../core/reactors/ReshuffleTriggerTest.java     |  67 ---
 .../beam/runners/core/reactors/StubTrigger.java |  70 ---
 .../beam/runners/core/reactors/TriggerTest.java | 118 -----
 .../runners/core/reactors/TriggerTester.java    | 410 ---------------
 .../core/triggers/AfterAllStateMachineTest.java | 140 +++++
 .../triggers/AfterEachStateMachineTest.java     | 108 ++++
 .../triggers/AfterFirstStateMachineTest.java    | 159 ++++++
 .../triggers/AfterPaneStateMachineTest.java     | 117 ++++
 .../AfterProcessingTimeStateMachineTest.java    | 172 ++++++
 ...chronizedProcessingTimeStateMachineTest.java | 110 ++++
 .../AfterWatermarkStateMachineTest.java         | 382 ++++++++++++++
 .../DefaultTriggerStateMachineTest.java         | 165 ++++++
 .../ExecutableTriggerStateMachineTest.java      | 108 ++++
 .../triggers/FinishedTriggersBitSetTest.java    |  55 ++
 .../triggers/FinishedTriggersProperties.java    | 115 ++++
 .../core/triggers/FinishedTriggersSetTest.java  |  60 +++
 .../core/triggers/NeverStateMachineTest.java    |  59 +++
 .../triggers/OrFinallyStateMachineTest.java     | 177 +++++++
 .../triggers/RepeatedlyStateMachineTest.java    | 200 +++++++
 .../ReshuffleTriggerStateMachineTest.java       |  68 +++
 .../core/triggers/StubTriggerStateMachine.java  |  60 +++
 .../core/triggers/TriggerStateMachineTest.java  |  98 ++++
 .../triggers/TriggerStateMachineTester.java     | 431 +++++++++++++++
 79 files changed, 6014 insertions(+), 6435 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterAll.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterAll.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterAll.java
deleted file mode 100644
index cc8c97f..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterAll.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.base.Joiner;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.ExecutableTrigger;
-import org.joda.time.Instant;
-
-/**
- * Create a {@link Trigger} that fires and finishes once after all of its sub-triggers have fired.
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public class AfterAll extends OnceTrigger {
-
-  private AfterAll(List<Trigger> subTriggers) {
-    super(subTriggers);
-    checkArgument(subTriggers.size() > 1);
-  }
-
-  /**
-   * Returns an {@code AfterAll} {@code Trigger} with the given subtriggers.
-   */
-  public static OnceTrigger of(OnceTrigger... triggers) {
-    return new AfterAll(Arrays.<Trigger>asList(triggers));
-  }
-
-  @Override
-  public void onElement(OnElementContext c) throws Exception {
-    for (ExecutableTrigger subTrigger : c.trigger().unfinishedSubTriggers()) {
-      // Since subTriggers are all OnceTriggers, they must either CONTINUE or FIRE_AND_FINISH.
-      // invokeElement will automatically mark the finish bit if they return FIRE_AND_FINISH.
-      subTrigger.invokeOnElement(c);
-    }
-  }
-
-  @Override
-  public void onMerge(OnMergeContext c) throws Exception {
-    for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
-      subTrigger.invokeOnMerge(c);
-    }
-    boolean allFinished = true;
-    for (ExecutableTrigger subTrigger1 : c.trigger().subTriggers()) {
-      allFinished &= c.forTrigger(subTrigger1).trigger().isFinished();
-    }
-    c.trigger().setFinished(allFinished);
-  }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
-    // This trigger will fire after the latest of its sub-triggers.
-    Instant deadline = BoundedWindow.TIMESTAMP_MIN_VALUE;
-    for (Trigger subTrigger : subTriggers) {
-      Instant subDeadline = subTrigger.getWatermarkThatGuaranteesFiring(window);
-      if (deadline.isBefore(subDeadline)) {
-        deadline = subDeadline;
-      }
-    }
-    return deadline;
-  }
-
-  @Override
-  public OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) {
-    return new AfterAll(continuationTriggers);
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true} if all subtriggers return {@code true}.
-   */
-  @Override
-  public boolean shouldFire(TriggerContext context) throws Exception {
-    for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
-      if (!context.forTrigger(subtrigger).trigger().isFinished()
-          && !subtrigger.invokeShouldFire(context)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Invokes {@link #onFire} for all subtriggers, eliding redundant calls to {@link #shouldFire}
-   * because they all must be ready to fire.
-   */
-  @Override
-  public void onOnlyFiring(TriggerContext context) throws Exception {
-    for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
-      subtrigger.invokeOnFire(context);
-    }
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder builder = new StringBuilder("AfterAll.of(");
-    Joiner.on(", ").appendTo(builder, subTriggers);
-    builder.append(")");
-
-    return builder.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterDelayFromFirstElement.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterDelayFromFirstElement.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterDelayFromFirstElement.java
deleted file mode 100644
index c4bc946..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterDelayFromFirstElement.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import com.google.common.collect.ImmutableList;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.List;
-import java.util.Locale;
-import java.util.Objects;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.InstantCoder;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
-import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateMerging;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.joda.time.format.PeriodFormat;
-import org.joda.time.format.PeriodFormatter;
-
-/**
- * A base class for triggers that happen after a processing time delay from the arrival
- * of the first element in a pane.
- *
- * <p>This class is for internal use only and may change at any time.
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public abstract class AfterDelayFromFirstElement extends OnceTrigger {
-
-  protected static final List<SerializableFunction<Instant, Instant>> IDENTITY =
-      ImmutableList.<SerializableFunction<Instant, Instant>>of();
-
-  protected static final StateTag<Object, AccumulatorCombiningState<Instant,
-                                              Combine.Holder<Instant>, Instant>> DELAYED_UNTIL_TAG =
-      StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
-          "delayed", InstantCoder.of(), Min.MinFn.<Instant>naturalOrder()));
-
-  private static final PeriodFormatter PERIOD_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH);
-
-  /**
-   * To complete an implementation, return the desired time from the TriggerContext.
-   */
-  @Nullable
-  public abstract Instant getCurrentTime(Trigger.TriggerContext context);
-
-  /**
-   * To complete an implementation, return a new instance like this one, but incorporating
-   * the provided timestamp mapping functions. Generally should be used by calling the
-   * constructor of this class from the constructor of the subclass.
-   */
-  protected abstract AfterDelayFromFirstElement newWith(
-      List<SerializableFunction<Instant, Instant>> transform);
-
-  /**
-   * A list of timestampMappers m1, m2, m3, ... m_n considered to be composed in sequence. The
-   * overall mapping for an instance `instance` is `m_n(... m3(m2(m1(instant))`,
-   * implemented via #computeTargetTimestamp
-   */
-  protected final List<SerializableFunction<Instant, Instant>> timestampMappers;
-
-  private final TimeDomain timeDomain;
-
-  public AfterDelayFromFirstElement(
-      TimeDomain timeDomain,
-      List<SerializableFunction<Instant, Instant>> timestampMappers) {
-    super(null);
-    this.timestampMappers = timestampMappers;
-    this.timeDomain = timeDomain;
-  }
-
-  private Instant getTargetTimestamp(OnElementContext c) {
-    return computeTargetTimestamp(c.currentProcessingTime());
-  }
-
-  /**
-   * Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater
-   * than the timestamp.
-   *
-   * <p>TODO: Consider sharing this with FixedWindows, and bring over the equivalent of
-   * CalendarWindows.
-   */
-  public AfterDelayFromFirstElement alignedTo(final Duration size, final Instant offset) {
-    return newWith(new AlignFn(size, offset));
-  }
-
-  /**
-   * Aligns the time to be the smallest multiple of {@code size} greater than the timestamp
-   * since the epoch.
-   */
-  public AfterDelayFromFirstElement alignedTo(final Duration size) {
-    return alignedTo(size, new Instant(0));
-  }
-
-  /**
-   * Adds some delay to the original target time.
-   *
-   * @param delay the delay to add
-   * @return An updated time trigger that will wait the additional time before firing.
-   */
-  public AfterDelayFromFirstElement plusDelayOf(final Duration delay) {
-    return newWith(new DelayFn(delay));
-  }
-
-  /**
-   * @deprecated This will be removed in the next major version. Please use only
-   *             {@link #plusDelayOf} and {@link #alignedTo}.
-   */
-  @Deprecated
-  public OnceTrigger mappedTo(SerializableFunction<Instant, Instant> timestampMapper) {
-    return newWith(timestampMapper);
-  }
-
-  @Override
-  public boolean isCompatible(Trigger other) {
-    if (!getClass().equals(other.getClass())) {
-      return false;
-    }
-
-    AfterDelayFromFirstElement that = (AfterDelayFromFirstElement) other;
-    return this.timestampMappers.equals(that.timestampMappers);
-  }
-
-
-  private AfterDelayFromFirstElement newWith(
-      SerializableFunction<Instant, Instant> timestampMapper) {
-    return newWith(
-        ImmutableList.<SerializableFunction<Instant, Instant>>builder()
-            .addAll(timestampMappers)
-            .add(timestampMapper)
-            .build());
-  }
-
-  @Override
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification =
-      "prefetch side effect")
-  public void prefetchOnElement(StateAccessor<?> state) {
-    state.access(DELAYED_UNTIL_TAG).readLater();
-  }
-
-  @Override
-  public void onElement(OnElementContext c) throws Exception {
-    CombiningState<Instant, Instant> delayUntilState = c.state().access(DELAYED_UNTIL_TAG);
-    Instant oldDelayUntil = delayUntilState.read();
-
-    // Since processing time can only advance, resulting in target wake-up times we would
-    // ignore anyhow, we don't bother with it if it is already set.
-    if (oldDelayUntil != null) {
-      return;
-    }
-
-    Instant targetTimestamp = getTargetTimestamp(c);
-    delayUntilState.add(targetTimestamp);
-    c.setTimer(targetTimestamp, timeDomain);
-  }
-
-  @Override
-  public void prefetchOnMerge(MergingStateAccessor<?, ?> state) {
-    super.prefetchOnMerge(state);
-    StateMerging.prefetchCombiningValues(state, DELAYED_UNTIL_TAG);
-  }
-
-  @Override
-  public void onMerge(OnMergeContext c) throws Exception {
-    // NOTE: We could try to delete all timers which are still active, but we would
-    // need access to a timer context for each merging window.
-    // for (CombiningValueStateInternal<Instant, Combine.Holder<Instant>, Instant> state :
-    //    c.state().accessInEachMergingWindow(DELAYED_UNTIL_TAG).values()) {
-    //   Instant timestamp = state.get().read();
-    //   if (timestamp != null) {
-    //     <context for merging window>.deleteTimer(timestamp, timeDomain);
-    //   }
-    // }
-    // Instead let them fire and be ignored.
-
-    // If the trigger is already finished, there is no way it will become re-activated
-    if (c.trigger().isFinished()) {
-      StateMerging.clear(c.state(), DELAYED_UNTIL_TAG);
-      // NOTE: We do not attempt to delete  the timers.
-      return;
-    }
-
-    // Determine the earliest point across all the windows, and delay to that.
-    StateMerging.mergeCombiningValues(c.state(), DELAYED_UNTIL_TAG);
-
-    Instant earliestTargetTime = c.state().access(DELAYED_UNTIL_TAG).read();
-    if (earliestTargetTime != null) {
-      c.setTimer(earliestTargetTime, timeDomain);
-    }
-  }
-
-  @Override
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification =
-      "prefetch side effect")
-  public void prefetchShouldFire(StateAccessor<?> state) {
-    state.access(DELAYED_UNTIL_TAG).readLater();
-  }
-
-  @Override
-  public void clear(TriggerContext c) throws Exception {
-    c.state().access(DELAYED_UNTIL_TAG).clear();
-  }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
-    return BoundedWindow.TIMESTAMP_MAX_VALUE;
-  }
-
-  @Override
-  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
-    Instant delayedUntil = context.state().access(DELAYED_UNTIL_TAG).read();
-    return delayedUntil != null
-        && getCurrentTime(context) != null
-        && getCurrentTime(context).isAfter(delayedUntil);
-  }
-
-  @Override
-  protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception {
-    clear(context);
-  }
-
-  protected Instant computeTargetTimestamp(Instant time) {
-    Instant result = time;
-    for (SerializableFunction<Instant, Instant> timestampMapper : timestampMappers) {
-      result = timestampMapper.apply(result);
-    }
-    return result;
-  }
-
-  /**
-   * A {@link SerializableFunction} to delay the timestamp at which this triggers fires.
-   */
-  private static final class DelayFn implements SerializableFunction<Instant, Instant> {
-    private final Duration delay;
-
-    public DelayFn(Duration delay) {
-      this.delay = delay;
-    }
-
-    @Override
-    public Instant apply(Instant input) {
-      return input.plus(delay);
-    }
-
-    @Override
-    public boolean equals(Object object) {
-      if (object == this) {
-        return true;
-      }
-
-      if (!(object instanceof DelayFn)) {
-        return false;
-      }
-
-      return this.delay.equals(((DelayFn) object).delay);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(delay);
-    }
-
-    @Override
-    public String toString() {
-      return PERIOD_FORMATTER.print(delay.toPeriod());
-    }
-  }
-
-  /**
-   * A {@link SerializableFunction} to align an instant to the nearest interval boundary.
-   */
-  static final class AlignFn implements SerializableFunction<Instant, Instant> {
-    private final Duration size;
-    private final Instant offset;
-
-
-    /**
-     * Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater
-     * than the timestamp.
-     */
-    public AlignFn(Duration size, Instant offset) {
-      this.size = size;
-      this.offset = offset;
-    }
-
-    @Override
-    public Instant apply(Instant point) {
-      long millisSinceStart = new Duration(offset, point).getMillis() % size.getMillis();
-      return millisSinceStart == 0 ? point : point.plus(size).minus(millisSinceStart);
-    }
-
-    @Override
-    public boolean equals(Object object) {
-      if (object == this) {
-        return true;
-      }
-
-      if (!(object instanceof AlignFn)) {
-        return false;
-      }
-
-      AlignFn other = (AlignFn) object;
-      return other.size.equals(this.size)
-          && other.offset.equals(this.offset);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(size, offset);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterEach.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterEach.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterEach.java
deleted file mode 100644
index 629c640..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterEach.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.base.Joiner;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.util.ExecutableTrigger;
-import org.joda.time.Instant;
-
-/**
- * A composite {@link Trigger} that executes its sub-triggers in order.
- * Only one sub-trigger is executing at a time,
- * and any time it fires the {@code AfterEach} fires. When the currently executing
- * sub-trigger finishes, the {@code AfterEach} starts executing the next sub-trigger.
- *
- * <p>{@code AfterEach.inOrder(t1, t2, ...)} finishes when all of the sub-triggers have finished.
- *
- * <p>The following properties hold:
- * <ul>
- *   <li> {@code AfterEach.inOrder(AfterEach.inOrder(a, b), c)} behaves the same as
- *   {@code AfterEach.inOrder(a, b, c)} and {@code AfterEach.inOrder(a, AfterEach.inOrder(b, c)}.
- *   <li> {@code AfterEach.inOrder(Repeatedly.forever(a), b)} behaves the same as
- *   {@code Repeatedly.forever(a)}, since the repeated trigger never finishes.
- * </ul>
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public class AfterEach extends Trigger {
-
-  private AfterEach(List<Trigger> subTriggers) {
-    super(subTriggers);
-    checkArgument(subTriggers.size() > 1);
-  }
-
-  /**
-   * Returns an {@code AfterEach} {@code Trigger} with the given subtriggers.
-   */
-  @SafeVarargs
-  public static Trigger inOrder(Trigger... triggers) {
-    return new AfterEach(Arrays.<Trigger>asList(triggers));
-  }
-
-  @Override
-  public void onElement(OnElementContext c) throws Exception {
-    if (!c.trigger().isMerging()) {
-      // If merges are not possible, we need only run the first unfinished subtrigger
-      c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c);
-    } else {
-      // If merges are possible, we need to run all subtriggers in parallel
-      for (ExecutableTrigger subTrigger :  c.trigger().subTriggers()) {
-        // Even if the subTrigger is done, it may be revived via merging and must have
-        // adequate state.
-        subTrigger.invokeOnElement(c);
-      }
-    }
-  }
-
-  @Override
-  public void onMerge(OnMergeContext context) throws Exception {
-    // If merging makes a subtrigger no-longer-finished, it will automatically
-    // begin participating in shouldFire and onFire appropriately.
-
-    // All the following triggers are retroactively "not started" but that is
-    // also automatic because they are cleared whenever this trigger
-    // fires.
-    boolean priorTriggersAllFinished = true;
-    for (ExecutableTrigger subTrigger : context.trigger().subTriggers()) {
-      if (priorTriggersAllFinished) {
-        subTrigger.invokeOnMerge(context);
-        priorTriggersAllFinished &= context.forTrigger(subTrigger).trigger().isFinished();
-      } else {
-        subTrigger.invokeClear(context);
-      }
-    }
-    updateFinishedState(context);
-  }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
-    // This trigger will fire at least once when the first trigger in the sequence
-    // fires at least once.
-    return subTriggers.get(0).getWatermarkThatGuaranteesFiring(window);
-  }
-
-  @Override
-  public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
-    return Repeatedly.forever(new AfterFirst(continuationTriggers));
-  }
-
-  @Override
-  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
-    ExecutableTrigger firstUnfinished = context.trigger().firstUnfinishedSubTrigger();
-    return firstUnfinished.invokeShouldFire(context);
-  }
-
-  @Override
-  public void onFire(Trigger.TriggerContext context) throws Exception {
-    context.trigger().firstUnfinishedSubTrigger().invokeOnFire(context);
-
-    // Reset all subtriggers if in a merging context; any may be revived by merging so they are
-    // all run in parallel for each pending pane.
-    if (context.trigger().isMerging()) {
-      for (ExecutableTrigger subTrigger : context.trigger().subTriggers()) {
-        subTrigger.invokeClear(context);
-      }
-    }
-
-    updateFinishedState(context);
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder builder = new StringBuilder("AfterEach.inOrder(");
-    Joiner.on(", ").appendTo(builder, subTriggers);
-    builder.append(")");
-
-    return builder.toString();
-  }
-
-  private void updateFinishedState(TriggerContext context) {
-    context.trigger().setFinished(context.trigger().firstUnfinishedSubTrigger() == null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterFirst.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterFirst.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterFirst.java
deleted file mode 100644
index 6b06cfa..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterFirst.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.base.Joiner;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.ExecutableTrigger;
-import org.joda.time.Instant;
-
-/**
- * Create a composite {@link Trigger} that fires once after at least one of its sub-triggers have
- * fired.
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public class AfterFirst extends OnceTrigger {
-
-  AfterFirst(List<Trigger> subTriggers) {
-    super(subTriggers);
-    checkArgument(subTriggers.size() > 1);
-  }
-
-  /**
-   * Returns an {@code AfterFirst} {@code Trigger} with the given subtriggers.
-   */
-  public static OnceTrigger of(OnceTrigger... triggers) {
-    return new AfterFirst(Arrays.<Trigger>asList(triggers));
-  }
-
-  @Override
-  public void onElement(OnElementContext c) throws Exception {
-    for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
-      subTrigger.invokeOnElement(c);
-    }
-  }
-
-  @Override
-  public void onMerge(OnMergeContext c) throws Exception {
-    for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
-      subTrigger.invokeOnMerge(c);
-    }
-    updateFinishedStatus(c);
-  }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
-    // This trigger will fire after the earliest of its sub-triggers.
-    Instant deadline = BoundedWindow.TIMESTAMP_MAX_VALUE;
-    for (Trigger subTrigger : subTriggers) {
-      Instant subDeadline = subTrigger.getWatermarkThatGuaranteesFiring(window);
-      if (deadline.isAfter(subDeadline)) {
-        deadline = subDeadline;
-      }
-    }
-    return deadline;
-  }
-
-  @Override
-  public OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) {
-    return new AfterFirst(continuationTriggers);
-  }
-
-  @Override
-  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
-    for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
-      if (context.forTrigger(subtrigger).trigger().isFinished()
-          || subtrigger.invokeShouldFire(context)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  @Override
-  protected void onOnlyFiring(TriggerContext context) throws Exception {
-    for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
-      TriggerContext subContext = context.forTrigger(subtrigger);
-      if (subtrigger.invokeShouldFire(subContext)) {
-        // If the trigger is ready to fire, then do whatever it needs to do.
-        subtrigger.invokeOnFire(subContext);
-      } else {
-        // If the trigger is not ready to fire, it is nonetheless true that whatever
-        // pending pane it was tracking is now gone.
-        subtrigger.invokeClear(subContext);
-      }
-    }
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder builder = new StringBuilder("AfterFirst.of(");
-    Joiner.on(", ").appendTo(builder, subTriggers);
-    builder.append(")");
-
-    return builder.toString();
-  }
-
-  private void updateFinishedStatus(TriggerContext c) {
-    boolean anyFinished = false;
-    for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
-      anyFinished |= c.forTrigger(subTrigger).trigger().isFinished();
-    }
-    c.trigger().setFinished(anyFinished);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterPane.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterPane.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterPane.java
deleted file mode 100644
index 8c128dd..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterPane.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.List;
-import java.util.Objects;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateMerging;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.joda.time.Instant;
-
-/**
- * {@link Trigger}s that fire based on properties of the elements in the current pane.
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public class AfterPane extends OnceTrigger {
-
-private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
-      ELEMENTS_IN_PANE_TAG =
-      StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
-          "count", VarLongCoder.of(), new Sum.SumLongFn()));
-
-  private final int countElems;
-
-  private AfterPane(int countElems) {
-    super(null);
-    this.countElems = countElems;
-  }
-
-  /**
-   * Creates a trigger that fires when the pane contains at least {@code countElems} elements.
-   */
-  public static AfterPane elementCountAtLeast(int countElems) {
-    return new AfterPane(countElems);
-  }
-
-  @Override
-  public void onElement(OnElementContext c) throws Exception {
-    c.state().access(ELEMENTS_IN_PANE_TAG).add(1L);
-  }
-
-  @Override
-  public void prefetchOnMerge(MergingStateAccessor<?, ?> state) {
-    super.prefetchOnMerge(state);
-    StateMerging.prefetchCombiningValues(state, ELEMENTS_IN_PANE_TAG);
-  }
-
-  @Override
-  public void onMerge(OnMergeContext context) throws Exception {
-    // If we've already received enough elements and finished in some window,
-    // then this trigger is just finished.
-    if (context.trigger().finishedInAnyMergingWindow()) {
-      context.trigger().setFinished(true);
-      StateMerging.clear(context.state(), ELEMENTS_IN_PANE_TAG);
-      return;
-    }
-
-    // Otherwise, compute the sum of elements in all the active panes.
-    StateMerging.mergeCombiningValues(context.state(), ELEMENTS_IN_PANE_TAG);
-  }
-
-  @Override
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification =
-      "prefetch side effect")
-  public void prefetchShouldFire(StateAccessor<?> state) {
-    state.access(ELEMENTS_IN_PANE_TAG).readLater();
-  }
-
-  @Override
-  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
-    long count = context.state().access(ELEMENTS_IN_PANE_TAG).read();
-    return count >= countElems;
-  }
-
-  @Override
-  public void clear(TriggerContext c) throws Exception {
-    c.state().access(ELEMENTS_IN_PANE_TAG).clear();
-  }
-
-  @Override
-  public boolean isCompatible(Trigger other) {
-    return this.equals(other);
-  }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
-    return BoundedWindow.TIMESTAMP_MAX_VALUE;
-  }
-
-  @Override
-  public OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) {
-    return AfterPane.elementCountAtLeast(1);
-  }
-
-  @Override
-  public String toString() {
-    return "AfterPane.elementCountAtLeast(" + countElems + ")";
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (!(obj instanceof AfterPane)) {
-      return false;
-    }
-    AfterPane that = (AfterPane) obj;
-    return this.countElems == that.countElems;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(countElems);
-  }
-
-  @Override
-  protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception {
-    clear(context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterProcessingTime.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterProcessingTime.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterProcessingTime.java
deleted file mode 100644
index f551118..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterProcessingTime.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import java.util.List;
-import java.util.Objects;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.joda.time.Instant;
-
-/**
- * {@code AfterProcessingTime} triggers fire based on the current processing time. They operate in
- * the real-time domain.
- *
- * <p>The time at which to fire the timer can be adjusted via the methods in
- * {@link AfterDelayFromFirstElement}, such as {@link AfterDelayFromFirstElement#plusDelayOf} or
- * {@link AfterDelayFromFirstElement#alignedTo}.
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public class AfterProcessingTime extends AfterDelayFromFirstElement {
-
-  @Override
-  @Nullable
-  public Instant getCurrentTime(Trigger.TriggerContext context) {
-    return context.currentProcessingTime();
-  }
-
-  private AfterProcessingTime(List<SerializableFunction<Instant, Instant>> transforms) {
-    super(TimeDomain.PROCESSING_TIME, transforms);
-  }
-
-  /**
-   * Creates a trigger that fires when the current processing time passes the processing time
-   * at which this trigger saw the first element in a pane.
-   */
-  public static AfterProcessingTime pastFirstElementInPane() {
-    return new AfterProcessingTime(IDENTITY);
-  }
-
-  @Override
-  protected AfterProcessingTime newWith(
-      List<SerializableFunction<Instant, Instant>> transforms) {
-    return new AfterProcessingTime(transforms);
-  }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
-    return BoundedWindow.TIMESTAMP_MAX_VALUE;
-  }
-
-  @Override
-  protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
-    return new AfterSynchronizedProcessingTime();
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder builder = new StringBuilder("AfterProcessingTime.pastFirstElementInPane()");
-    for (SerializableFunction<Instant, Instant> delayFn : timestampMappers) {
-      builder
-          .append(".plusDelayOf(")
-          .append(delayFn)
-          .append(")");
-    }
-
-    return builder.toString();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (!(obj instanceof AfterProcessingTime)) {
-      return false;
-    }
-    AfterProcessingTime that = (AfterProcessingTime) obj;
-    return Objects.equals(this.timestampMappers, that.timestampMappers);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(getClass(), this.timestampMappers);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTime.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTime.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTime.java
deleted file mode 100644
index 59ece10..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterSynchronizedProcessingTime.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import com.google.common.base.Objects;
-import java.util.Collections;
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.joda.time.Instant;
-
-class AfterSynchronizedProcessingTime extends AfterDelayFromFirstElement {
-
-  @Override
-  @Nullable
-  public Instant getCurrentTime(Trigger.TriggerContext context) {
-    return context.currentSynchronizedProcessingTime();
-  }
-
-  public AfterSynchronizedProcessingTime() {
-    super(TimeDomain.SYNCHRONIZED_PROCESSING_TIME,
-        Collections.<SerializableFunction<Instant, Instant>>emptyList());
-  }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
-    return BoundedWindow.TIMESTAMP_MAX_VALUE;
-  }
-
-  @Override
-  protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
-    return this;
-  }
-
-  @Override
-  public String toString() {
-    return "AfterSynchronizedProcessingTime.pastFirstElementInPane()";
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    return this == obj || obj instanceof AfterSynchronizedProcessingTime;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(AfterSynchronizedProcessingTime.class);
-  }
-
-  @Override
-  protected AfterSynchronizedProcessingTime
-      newWith(List<SerializableFunction<Instant, Instant>> transforms) {
-    // ignore transforms
-    return this;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterWatermark.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterWatermark.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterWatermark.java
deleted file mode 100644
index e2463d8..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/AfterWatermark.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.ImmutableList;
-import java.util.List;
-import java.util.Objects;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.ExecutableTrigger;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.joda.time.Instant;
-
-/**
- * {@code AfterWatermark} triggers fire based on progress of the system watermark. This time is a
- * lower-bound, sometimes heuristically established, on event times that have been fully processed
- * by the pipeline.
- *
- * <p>For sources that provide non-heuristic watermarks (e.g.
- * {@link org.apache.beam.sdk.io.PubsubIO} when using arrival times as event times), the
- * watermark is a strict guarantee that no data with an event time earlier than
- * that watermark will ever be observed in the pipeline. In this case, it's safe to assume that any
- * pane triggered by an {@code AfterWatermark} trigger with a reference point at or beyond the end
- * of the window will be the last pane ever for that window.
- *
- * <p>For sources that provide heuristic watermarks (e.g.
- * {@link org.apache.beam.sdk.io.PubsubIO} when using user-supplied event times), the
- * watermark itself becomes an <i>estimate</i> that no data with an event time earlier than that
- * watermark (i.e. "late data") will ever be observed in the pipeline. These heuristics can
- * often be quite accurate, but the chance of seeing late data for any given window is non-zero.
- * Thus, if absolute correctness over time is important to your use case, you may want to consider
- * using a trigger that accounts for late data. The default trigger,
- * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}, which fires
- * once when the watermark passes the end of the window and then immediately therafter when any
- * late data arrives, is one such example.
- *
- * <p>The watermark is the clock that defines {@link TimeDomain#EVENT_TIME}.
- *
- * <p>Additionaly firings before or after the watermark can be requested by calling
- * {@code AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)} or
- * {@code AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)}.
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public class AfterWatermark {
-
-  private static final String TO_STRING = "AfterWatermark.pastEndOfWindow()";
-
-  // Static factory class.
-  private AfterWatermark() {}
-
-  /**
-   * Creates a trigger that fires when the watermark passes the end of the window.
-   */
-  public static FromEndOfWindow pastEndOfWindow() {
-    return new FromEndOfWindow();
-  }
-
-  /**
-   * @see AfterWatermark
-   */
-  public static class AfterWatermarkEarlyAndLate extends Trigger {
-
-    private static final int EARLY_INDEX = 0;
-    private static final int LATE_INDEX = 1;
-
-    private final OnceTrigger earlyTrigger;
-    @Nullable
-    private final OnceTrigger lateTrigger;
-
-    @SuppressWarnings("unchecked")
-    public AfterWatermarkEarlyAndLate(OnceTrigger earlyTrigger, OnceTrigger lateTrigger) {
-      super(lateTrigger == null
-          ? ImmutableList.<Trigger>of(earlyTrigger)
-          : ImmutableList.<Trigger>of(earlyTrigger, lateTrigger));
-      this.earlyTrigger = checkNotNull(earlyTrigger, "earlyTrigger should not be null");
-      this.lateTrigger = lateTrigger;
-    }
-
-    public Trigger withEarlyFirings(OnceTrigger earlyTrigger) {
-      return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
-    }
-
-    public Trigger withLateFirings(OnceTrigger lateTrigger) {
-      return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
-    }
-
-    @Override
-    public void onElement(OnElementContext c) throws Exception {
-      if (!c.trigger().isMerging()) {
-        // If merges can never happen, we just run the unfinished subtrigger
-        c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c);
-      } else {
-        // If merges can happen, we run for all subtriggers because they might be
-        // de-activated or re-activated
-        for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
-          subTrigger.invokeOnElement(c);
-        }
-      }
-    }
-
-    @Override
-    public void onMerge(OnMergeContext c) throws Exception {
-      // NOTE that the ReduceFnRunner will delete all end-of-window timers for the
-      // merged-away windows.
-
-      ExecutableTrigger earlySubtrigger = c.trigger().subTrigger(EARLY_INDEX);
-      // We check the early trigger to determine if we are still processing it or
-      // if the end of window has transitioned us to the late trigger
-      OnMergeContext earlyContext = c.forTrigger(earlySubtrigger);
-
-      // If the early trigger is still active in any merging window then it is still active in
-      // the new merged window, because even if the merged window is "done" some pending elements
-      // haven't had a chance to fire.
-      if (!earlyContext.trigger().finishedInAllMergingWindows() || !endOfWindowReached(c)) {
-        earlyContext.trigger().setFinished(false);
-        if (lateTrigger != null) {
-          ExecutableTrigger lateSubtrigger = c.trigger().subTrigger(LATE_INDEX);
-          OnMergeContext lateContext = c.forTrigger(lateSubtrigger);
-          lateContext.trigger().setFinished(false);
-          lateSubtrigger.invokeClear(lateContext);
-        }
-      } else {
-        // Otherwise the early trigger and end-of-window bit is done for good.
-        earlyContext.trigger().setFinished(true);
-        if (lateTrigger != null) {
-          c.trigger().subTrigger(LATE_INDEX).invokeOnMerge(c);
-        }
-      }
-    }
-
-    @Override
-    public Trigger getContinuationTrigger() {
-      return new AfterWatermarkEarlyAndLate(
-          earlyTrigger.getContinuationTrigger(),
-          lateTrigger == null ? null : lateTrigger.getContinuationTrigger());
-    }
-
-    @Override
-    protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
-      throw new UnsupportedOperationException(
-          "Should not call getContinuationTrigger(List<Trigger>)");
-    }
-
-    @Override
-    public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
-      // Even without an early or late trigger, we'll still produce a firing at the watermark.
-      return window.maxTimestamp();
-    }
-
-    private boolean endOfWindowReached(Trigger.TriggerContext context) {
-      return context.currentEventTime() != null
-          && context.currentEventTime().isAfter(context.window().maxTimestamp());
-    }
-
-    @Override
-    public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
-      if (!context.trigger().isFinished(EARLY_INDEX)) {
-        // We have not yet transitioned to late firings.
-        // We should fire if either the trigger is ready or we reach the end of the window.
-        return context.trigger().subTrigger(EARLY_INDEX).invokeShouldFire(context)
-            || endOfWindowReached(context);
-      } else if (lateTrigger == null) {
-        return false;
-      } else {
-        // We are running the late trigger
-        return context.trigger().subTrigger(LATE_INDEX).invokeShouldFire(context);
-      }
-    }
-
-    @Override
-    public void onFire(Trigger.TriggerContext context) throws Exception {
-      if (!context.forTrigger(context.trigger().subTrigger(EARLY_INDEX)).trigger().isFinished()) {
-        onNonLateFiring(context);
-      } else if (lateTrigger != null) {
-        onLateFiring(context);
-      } else {
-        // all done
-        context.trigger().setFinished(true);
-      }
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder builder = new StringBuilder(TO_STRING);
-
-      if (!(earlyTrigger instanceof Never.NeverTrigger)) {
-        builder
-            .append(".withEarlyFirings(")
-            .append(earlyTrigger)
-            .append(")");
-      }
-
-      if (lateTrigger != null && !(lateTrigger instanceof Never.NeverTrigger)) {
-        builder
-            .append(".withLateFirings(")
-            .append(lateTrigger)
-            .append(")");
-      }
-
-      return builder.toString();
-    }
-
-    private void onNonLateFiring(Trigger.TriggerContext context) throws Exception {
-      // We have not yet transitioned to late firings.
-      ExecutableTrigger earlySubtrigger = context.trigger().subTrigger(EARLY_INDEX);
-      Trigger.TriggerContext earlyContext = context.forTrigger(earlySubtrigger);
-
-      if (!endOfWindowReached(context)) {
-        // This is an early firing, since we have not arrived at the end of the window
-        // Implicitly repeats
-        earlySubtrigger.invokeOnFire(context);
-        earlySubtrigger.invokeClear(context);
-        earlyContext.trigger().setFinished(false);
-      } else {
-        // We have arrived at the end of the window; terminate the early trigger
-        // and clear out the late trigger's state
-        if (earlySubtrigger.invokeShouldFire(context)) {
-          earlySubtrigger.invokeOnFire(context);
-        }
-        earlyContext.trigger().setFinished(true);
-        earlySubtrigger.invokeClear(context);
-
-        if (lateTrigger == null) {
-          // Done if there is no late trigger.
-          context.trigger().setFinished(true);
-        } else {
-          // If there is a late trigger, we transition to it, and need to clear its state
-          // because it was run in parallel.
-          context.trigger().subTrigger(LATE_INDEX).invokeClear(context);
-        }
-      }
-
-    }
-
-    private void onLateFiring(Trigger.TriggerContext context) throws Exception {
-      // We are firing the late trigger, with implicit repeat
-      ExecutableTrigger lateSubtrigger = context.trigger().subTrigger(LATE_INDEX);
-      lateSubtrigger.invokeOnFire(context);
-      // It is a OnceTrigger, so it must have finished; unfinished it and clear it
-      lateSubtrigger.invokeClear(context);
-      context.forTrigger(lateSubtrigger).trigger().setFinished(false);
-    }
-  }
-
-  /**
-   * A watermark trigger targeted relative to the end of the window.
-   */
-  public static class FromEndOfWindow extends OnceTrigger {
-
-    private FromEndOfWindow() {
-      super(null);
-    }
-
-    /**
-     * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever
-     * the given {@code Trigger} fires before the watermark has passed the end of the window.
-     */
-    public AfterWatermarkEarlyAndLate withEarlyFirings(OnceTrigger earlyFirings) {
-      checkNotNull(earlyFirings, "Must specify the trigger to use for early firings");
-      return new AfterWatermarkEarlyAndLate(earlyFirings, null);
-    }
-
-    /**
-     * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever
-     * the given {@code Trigger} fires after the watermark has passed the end of the window.
-     */
-    public AfterWatermarkEarlyAndLate withLateFirings(OnceTrigger lateFirings) {
-      checkNotNull(lateFirings, "Must specify the trigger to use for late firings");
-      return new AfterWatermarkEarlyAndLate(Never.ever(), lateFirings);
-    }
-
-    @Override
-    public void onElement(OnElementContext c) throws Exception {
-      // We're interested in knowing when the input watermark passes the end of the window.
-      // (It is possible this has already happened, in which case the timer will be fired
-      // almost immediately).
-      c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
-    }
-
-    @Override
-    public void onMerge(OnMergeContext c) throws Exception {
-      // NOTE that the ReduceFnRunner will delete all end-of-window timers for the
-      // merged-away windows.
-
-      if (!c.trigger().finishedInAllMergingWindows()) {
-        // If the trigger is still active in any merging window then it is still active in the new
-        // merged window, because even if the merged window is "done" some pending elements haven't
-        // had a chance to fire
-        c.trigger().setFinished(false);
-      } else if (!endOfWindowReached(c)) {
-        // If the end of the new window has not been reached, then the trigger is active again.
-        c.trigger().setFinished(false);
-      } else {
-        // Otherwise it is done for good
-        c.trigger().setFinished(true);
-      }
-    }
-
-    @Override
-    public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
-      return window.maxTimestamp();
-    }
-
-    @Override
-    public FromEndOfWindow getContinuationTrigger(List<Trigger> continuationTriggers) {
-      return this;
-    }
-
-    @Override
-    public String toString() {
-      return TO_STRING;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      return obj instanceof FromEndOfWindow;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass());
-    }
-
-    @Override
-    public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
-      return endOfWindowReached(context);
-    }
-
-    private boolean endOfWindowReached(Trigger.TriggerContext context) {
-      return context.currentEventTime() != null
-          && context.currentEventTime().isAfter(context.window().maxTimestamp());
-    }
-
-    @Override
-    protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception { }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/DefaultTrigger.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/DefaultTrigger.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/DefaultTrigger.java
deleted file mode 100644
index d6b72ef..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/DefaultTrigger.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import java.util.List;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.joda.time.Instant;
-
-/**
- * A trigger that is equivalent to {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
- * See {@link Repeatedly#forever} and {@link AfterWatermark#pastEndOfWindow} for more details.
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public class DefaultTrigger extends Trigger{
-
-  private DefaultTrigger() {
-    super(null);
-  }
-
-  /**
-   * Returns the default trigger.
-   */
-  public static DefaultTrigger of() {
-    return new DefaultTrigger();
-  }
-
-  @Override
-  public void onElement(OnElementContext c) throws Exception {
-    // If the end of the window has already been reached, then we are already ready to fire
-    // and do not need to set a wake-up timer.
-    if (!endOfWindowReached(c)) {
-      c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
-    }
-  }
-
-  @Override
-  public void onMerge(OnMergeContext c) throws Exception {
-    // If the end of the window has already been reached, then we are already ready to fire
-    // and do not need to set a wake-up timer.
-    if (!endOfWindowReached(c)) {
-      c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
-    }
-  }
-
-  @Override
-  public void clear(TriggerContext c) throws Exception { }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
-    return window.maxTimestamp();
-  }
-
-  @Override
-  public boolean isCompatible(Trigger other) {
-    // Semantically, all default triggers are identical
-    return other instanceof DefaultTrigger;
-  }
-
-  @Override
-  public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
-    return this;
-  }
-
-  @Override
-  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
-    return endOfWindowReached(context);
-  }
-
-  private boolean endOfWindowReached(Trigger.TriggerContext context) {
-    return context.currentEventTime() != null
-        && context.currentEventTime().isAfter(context.window().maxTimestamp());
-  }
-
-  @Override
-  public void onFire(Trigger.TriggerContext context) throws Exception { }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/ExecutableTrigger.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/ExecutableTrigger.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/ExecutableTrigger.java
deleted file mode 100644
index 088c499..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/ExecutableTrigger.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-
-/**
- * A wrapper around a trigger used during execution. While an actual trigger may appear multiple
- * times (both in the same trigger expression and in other trigger expressions), the
- * {@code ExecutableTrigger} wrapped around them forms a tree (only one occurrence).
- */
-public class ExecutableTrigger implements Serializable {
-
-  /** Store the index assigned to this trigger. */
-  private final int triggerIndex;
-  private final int firstIndexAfterSubtree;
-  private final List<ExecutableTrigger> subTriggers = new ArrayList<>();
-  private final Trigger trigger;
-
-  public static <W extends BoundedWindow> ExecutableTrigger create(Trigger trigger) {
-    return create(trigger, 0);
-  }
-
-  private static <W extends BoundedWindow> ExecutableTrigger create(
-      Trigger trigger, int nextUnusedIndex) {
-    if (trigger instanceof OnceTrigger) {
-      return new ExecutableOnceTrigger((OnceTrigger) trigger, nextUnusedIndex);
-    } else {
-      return new ExecutableTrigger(trigger, nextUnusedIndex);
-    }
-  }
-
-  public static <W extends BoundedWindow> ExecutableTrigger createForOnceTrigger(
-      OnceTrigger trigger, int nextUnusedIndex) {
-    return new ExecutableOnceTrigger(trigger, nextUnusedIndex);
-  }
-
-  private ExecutableTrigger(Trigger trigger, int nextUnusedIndex) {
-    this.trigger = checkNotNull(trigger, "trigger must not be null");
-    this.triggerIndex = nextUnusedIndex++;
-
-    if (trigger.subTriggers() != null) {
-      for (Trigger subTrigger : trigger.subTriggers()) {
-        ExecutableTrigger subExecutable = create(subTrigger, nextUnusedIndex);
-        subTriggers.add(subExecutable);
-        nextUnusedIndex = subExecutable.firstIndexAfterSubtree;
-      }
-    }
-    firstIndexAfterSubtree = nextUnusedIndex;
-  }
-
-  public List<ExecutableTrigger> subTriggers() {
-    return subTriggers;
-  }
-
-  @Override
-  public String toString() {
-    return trigger.toString();
-  }
-
-  /**
-   * Return the underlying trigger specification corresponding to this {@code ExecutableTrigger}.
-   */
-  public Trigger getSpec() {
-    return trigger;
-  }
-
-  public int getTriggerIndex() {
-    return triggerIndex;
-  }
-
-  public final int getFirstIndexAfterSubtree() {
-    return firstIndexAfterSubtree;
-  }
-
-  public boolean isCompatible(ExecutableTrigger other) {
-    return trigger.isCompatible(other.trigger);
-  }
-
-  public ExecutableTrigger getSubTriggerContaining(int index) {
-    checkNotNull(subTriggers);
-    checkState(index > triggerIndex && index < firstIndexAfterSubtree,
-        "Cannot find sub-trigger containing index not in this tree.");
-    ExecutableTrigger previous = null;
-    for (ExecutableTrigger subTrigger : subTriggers) {
-      if (index < subTrigger.triggerIndex) {
-        return previous;
-      }
-      previous = subTrigger;
-    }
-    return previous;
-  }
-
-  /**
-   * Invoke the {@link Trigger#onElement} method for this trigger, ensuring that the bits are
-   * properly updated if the trigger finishes.
-   */
-  public void invokeOnElement(Trigger.OnElementContext c) throws Exception {
-    trigger.onElement(c.forTrigger(this));
-  }
-
-  /**
-   * Invoke the {@link Trigger#onMerge} method for this trigger, ensuring that the bits are properly
-   * updated.
-   */
-  public void invokeOnMerge(Trigger.OnMergeContext c) throws Exception {
-    Trigger.OnMergeContext subContext = c.forTrigger(this);
-    trigger.onMerge(subContext);
-  }
-
-  public boolean invokeShouldFire(Trigger.TriggerContext c) throws Exception {
-    return trigger.shouldFire(c.forTrigger(this));
-  }
-
-  public void invokeOnFire(Trigger.TriggerContext c) throws Exception {
-    trigger.onFire(c.forTrigger(this));
-  }
-
-  /**
-   * Invoke clear for the current this trigger.
-   */
-  public void invokeClear(Trigger.TriggerContext c) throws Exception {
-    trigger.clear(c.forTrigger(this));
-  }
-
-  /**
-   * {@link ExecutableTrigger} that enforces the fact that the trigger should always FIRE_AND_FINISH
-   * and never just FIRE.
-   */
-  private static class ExecutableOnceTrigger extends ExecutableTrigger {
-
-    public ExecutableOnceTrigger(OnceTrigger trigger, int nextUnusedIndex) {
-      super(trigger, nextUnusedIndex);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggers.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggers.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggers.java
deleted file mode 100644
index 6666ab9..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggers.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-/**
- * A mutable set which tracks whether any particular {@link ExecutableTrigger} is
- * finished.
- */
-public interface FinishedTriggers {
-  /**
-   * Returns {@code true} if the trigger is finished.
-   */
-  public boolean isFinished(ExecutableTrigger trigger);
-
-  /**
-   * Sets the fact that the trigger is finished.
-   */
-  public void setFinished(ExecutableTrigger trigger, boolean value);
-
-  /**
-   * Sets the trigger and all of its subtriggers to unfinished.
-   */
-  public void clearRecursively(ExecutableTrigger trigger);
-
-  /**
-   * Create an independent copy of this mutable {@link FinishedTriggers}.
-   */
-  public FinishedTriggers copy();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSet.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSet.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSet.java
deleted file mode 100644
index 4cd617f..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggersBitSet.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.util.BitSet;
-
-/**
- * A {@link FinishedTriggers} implementation based on an underlying {@link BitSet}.
- */
-public class FinishedTriggersBitSet implements FinishedTriggers {
-
-  private final BitSet bitSet;
-
-  private FinishedTriggersBitSet(BitSet bitSet) {
-    this.bitSet = bitSet;
-  }
-
-  public static FinishedTriggersBitSet emptyWithCapacity(int capacity) {
-    return new FinishedTriggersBitSet(new BitSet(capacity));
-  }
-
-  public static FinishedTriggersBitSet fromBitSet(BitSet bitSet) {
-    return new FinishedTriggersBitSet(bitSet);
-  }
-
-  /**
-   * Returns the underlying {@link BitSet} for this {@link FinishedTriggersBitSet}.
-   */
-  public BitSet getBitSet() {
-    return bitSet;
-  }
-
-  @Override
-  public boolean isFinished(ExecutableTrigger trigger) {
-    return bitSet.get(trigger.getTriggerIndex());
-  }
-
-  @Override
-  public void setFinished(ExecutableTrigger trigger, boolean value) {
-    bitSet.set(trigger.getTriggerIndex(), value);
-  }
-
-  @Override
-  public void clearRecursively(ExecutableTrigger trigger) {
-    bitSet.clear(trigger.getTriggerIndex(), trigger.getFirstIndexAfterSubtree());
-  }
-
-  @Override
-  public FinishedTriggersBitSet copy() {
-    return new FinishedTriggersBitSet((BitSet) bitSet.clone());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggersSet.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggersSet.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggersSet.java
deleted file mode 100644
index a9feb73..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/FinishedTriggersSet.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.common.collect.Sets;
-import java.util.Set;
-
-/**
- * An implementation of {@link FinishedTriggers} atop a user-provided mutable {@link Set}.
- */
-public class FinishedTriggersSet implements FinishedTriggers {
-
-  private final Set<ExecutableTrigger> finishedTriggers;
-
-  private FinishedTriggersSet(Set<ExecutableTrigger> finishedTriggers) {
-    this.finishedTriggers = finishedTriggers;
-  }
-
-  public static FinishedTriggersSet fromSet(Set<ExecutableTrigger> finishedTriggers) {
-    return new FinishedTriggersSet(finishedTriggers);
-  }
-
-  /**
-   * Returns a mutable {@link Set} of the underlying triggers that are finished.
-   */
-  public Set<ExecutableTrigger> getFinishedTriggers() {
-    return finishedTriggers;
-  }
-
-  @Override
-  public boolean isFinished(ExecutableTrigger trigger) {
-    return finishedTriggers.contains(trigger);
-  }
-
-  @Override
-  public void setFinished(ExecutableTrigger trigger, boolean value) {
-    if (value) {
-      finishedTriggers.add(trigger);
-    } else {
-      finishedTriggers.remove(trigger);
-    }
-  }
-
-  @Override
-  public void clearRecursively(ExecutableTrigger trigger) {
-    finishedTriggers.remove(trigger);
-    for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
-      clearRecursively(subTrigger);
-    }
-  }
-
-  @Override
-  public FinishedTriggersSet copy() {
-    return fromSet(Sets.newHashSet(finishedTriggers));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Never.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Never.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Never.java
deleted file mode 100644
index 5f20465..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/Never.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import java.util.List;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.joda.time.Instant;
-
-/**
- * A trigger which never fires.
- *
- * <p>Using this trigger will only produce output when the watermark passes the end of the
- * {@link BoundedWindow window} plus the {@link Window#withAllowedLateness allowed
- * lateness}.
- */
-public final class Never {
-  /**
-   * Returns a trigger which never fires. Output will be produced from the using {@link GroupByKey}
-   * when the {@link BoundedWindow} closes.
-   */
-  public static OnceTrigger ever() {
-    // NeverTrigger ignores all inputs and is Window-type independent.
-    return new NeverTrigger();
-  }
-
-  // package-private in order to check identity for string formatting.
-  static class NeverTrigger extends OnceTrigger {
-    protected NeverTrigger() {
-      super(null);
-    }
-
-    @Override
-    public void onElement(OnElementContext c) {}
-
-    @Override
-    public void onMerge(OnMergeContext c) {}
-
-    @Override
-    protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
-      return this;
-    }
-
-    @Override
-    public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
-      return BoundedWindow.TIMESTAMP_MAX_VALUE;
-    }
-
-    @Override
-    public boolean shouldFire(Trigger.TriggerContext context) {
-      return false;
-    }
-
-    @Override
-    protected void onOnlyFiring(Trigger.TriggerContext context) {
-      throw new UnsupportedOperationException(
-          String.format("%s should never fire", getClass().getSimpleName()));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/OrFinallyTrigger.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/OrFinallyTrigger.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/OrFinallyTrigger.java
deleted file mode 100644
index 25b7b34..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/reactors/OrFinallyTrigger.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.util.ExecutableTrigger;
-import org.joda.time.Instant;
-
-/**
- * Executes the {@code actual} trigger until it finishes or until the {@code until} trigger fires.
- */
-class OrFinallyTrigger extends Trigger {
-
-  private static final int ACTUAL = 0;
-  private static final int UNTIL = 1;
-
-  @VisibleForTesting OrFinallyTrigger(Trigger actual, Trigger.OnceTrigger until) {
-    super(Arrays.asList(actual, until));
-  }
-
-  @Override
-  public void onElement(OnElementContext c) throws Exception {
-    c.trigger().subTrigger(ACTUAL).invokeOnElement(c);
-    c.trigger().subTrigger(UNTIL).invokeOnElement(c);
-  }
-
-  @Override
-  public void onMerge(OnMergeContext c) throws Exception {
-    for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
-      subTrigger.invokeOnMerge(c);
-    }
-    updateFinishedState(c);
-  }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
-    // This trigger fires once either the trigger or the until trigger fires.
-    Instant actualDeadline = subTriggers.get(ACTUAL).getWatermarkThatGuaranteesFiring(window);
-    Instant untilDeadline = subTriggers.get(UNTIL).getWatermarkThatGuaranteesFiring(window);
-    return actualDeadline.isBefore(untilDeadline) ? actualDeadline : untilDeadline;
-  }
-
-  @Override
-  public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
-    // Use OrFinallyTrigger instead of AfterFirst because the continuation of ACTUAL
-    // may not be a OnceTrigger.
-    return Repeatedly.forever(
-        new OrFinallyTrigger(
-            continuationTriggers.get(ACTUAL),
-            (Trigger.OnceTrigger) continuationTriggers.get(UNTIL)));
-  }
-
-  @Override
-  public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
-    return context.trigger().subTrigger(ACTUAL).invokeShouldFire(context)
-        || context.trigger().subTrigger(UNTIL).invokeShouldFire(context);
-  }
-
-  @Override
-  public void onFire(Trigger.TriggerContext context) throws Exception {
-    ExecutableTrigger actualSubtrigger = context.trigger().subTrigger(ACTUAL);
-    ExecutableTrigger untilSubtrigger = context.trigger().subTrigger(UNTIL);
-
-    if (untilSubtrigger.invokeShouldFire(context)) {
-      untilSubtrigger.invokeOnFire(context);
-      actualSubtrigger.invokeClear(context);
-    } else {
-      // If until didn't fire, then the actual must have (or it is forbidden to call
-      // onFire) so we are done only if actual is done.
-      actualSubtrigger.invokeOnFire(context);
-      // Do not clear the until trigger, because it tracks data cross firings.
-    }
-    updateFinishedState(context);
-  }
-
-  @Override
-  public String toString() {
-    return String.format("%s.orFinally(%s)", subTriggers.get(ACTUAL), subTriggers.get(UNTIL));
-  }
-
-  private void updateFinishedState(TriggerContext c) throws Exception {
-    boolean anyStillFinished = false;
-    for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
-      anyStillFinished |= c.forTrigger(subTrigger).trigger().isFinished();
-    }
-    c.trigger().setFinished(anyStillFinished);
-  }
-}