You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/27 17:27:57 UTC
[09/50] [abbrv] incubator-beam git commit: Remove pieces of Trigger
now owned by TriggerStateMachine
Remove pieces of Trigger now owned by TriggerStateMachine
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1eff320d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1eff320d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1eff320d
Branch: refs/heads/python-sdk
Commit: 1eff320d5e7fb5510d13016e0826b14e5cf7f686
Parents: dfaf2a8
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 24 12:57:37 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Oct 25 10:12:56 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/transforms/windowing/AfterAll.java | 49 --
.../windowing/AfterDelayFromFirstElement.java | 99 ----
.../sdk/transforms/windowing/AfterEach.java | 61 ---
.../sdk/transforms/windowing/AfterFirst.java | 50 --
.../sdk/transforms/windowing/AfterPane.java | 52 --
.../windowing/AfterProcessingTime.java | 7 -
.../AfterSynchronizedProcessingTime.java | 7 -
.../transforms/windowing/AfterWatermark.java | 158 ------
.../transforms/windowing/DefaultTrigger.java | 35 --
.../beam/sdk/transforms/windowing/Never.java | 17 -
.../transforms/windowing/OrFinallyTrigger.java | 46 --
.../sdk/transforms/windowing/Repeatedly.java | 30 --
.../beam/sdk/transforms/windowing/Trigger.java | 412 ++-------------
.../apache/beam/sdk/util/ExecutableTrigger.java | 40 +-
.../apache/beam/sdk/util/FinishedTriggers.java | 44 --
.../beam/sdk/util/FinishedTriggersBitSet.java | 67 ---
.../beam/sdk/util/FinishedTriggersSet.java | 72 ---
.../apache/beam/sdk/util/ReshuffleTrigger.java | 14 -
.../beam/sdk/util/TriggerContextFactory.java | 507 -------------------
.../sdk/transforms/windowing/AfterAllTest.java | 98 ----
.../sdk/transforms/windowing/AfterEachTest.java | 64 ---
.../transforms/windowing/AfterFirstTest.java | 120 -----
.../sdk/transforms/windowing/AfterPaneTest.java | 77 ---
.../windowing/AfterProcessingTimeTest.java | 94 ----
.../AfterSynchronizedProcessingTimeTest.java | 75 ---
.../windowing/AfterWatermarkTest.java | 308 -----------
.../windowing/DefaultTriggerTest.java | 130 -----
.../sdk/transforms/windowing/NeverTest.java | 34 +-
.../windowing/OrFinallyTriggerTest.java | 136 -----
.../transforms/windowing/RepeatedlyTest.java | 161 +-----
.../sdk/transforms/windowing/StubTrigger.java | 17 -
.../sdk/transforms/windowing/TriggerTest.java | 28 -
.../beam/sdk/util/ExecutableTriggerTest.java | 18 -
.../sdk/util/FinishedTriggersBitSetTest.java | 55 --
.../sdk/util/FinishedTriggersProperties.java | 110 ----
.../beam/sdk/util/FinishedTriggersSetTest.java | 60 ---
.../beam/sdk/util/ReshuffleTriggerTest.java | 23 -
.../org/apache/beam/sdk/util/TriggerTester.java | 410 ---------------
38 files changed, 77 insertions(+), 3708 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
index 0e37d33..c3f0848 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
@@ -24,7 +24,6 @@ import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.ExecutableTrigger;
import org.joda.time.Instant;
/**
@@ -46,27 +45,6 @@ public class AfterAll extends OnceTrigger {
}
@Override
- public void onElement(OnElementContext c) throws Exception {
- for (ExecutableTrigger subTrigger : c.trigger().unfinishedSubTriggers()) {
- // Since subTriggers are all OnceTriggers, they must either CONTINUE or FIRE_AND_FINISH.
- // invokeElement will automatically mark the finish bit if they return FIRE_AND_FINISH.
- subTrigger.invokeOnElement(c);
- }
- }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception {
- for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
- subTrigger.invokeOnMerge(c);
- }
- boolean allFinished = true;
- for (ExecutableTrigger subTrigger1 : c.trigger().subTriggers()) {
- allFinished &= c.forTrigger(subTrigger1).trigger().isFinished();
- }
- c.trigger().setFinished(allFinished);
- }
-
- @Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
// This trigger will fire after the latest of its sub-triggers.
Instant deadline = BoundedWindow.TIMESTAMP_MIN_VALUE;
@@ -84,33 +62,6 @@ public class AfterAll extends OnceTrigger {
return new AfterAll(continuationTriggers);
}
- /**
- * {@inheritDoc}
- *
- * @return {@code true} if all subtriggers return {@code true}.
- */
- @Override
- public boolean shouldFire(TriggerContext context) throws Exception {
- for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
- if (!context.forTrigger(subtrigger).trigger().isFinished()
- && !subtrigger.invokeShouldFire(context)) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Invokes {@link #onFire} for all subtriggers, eliding redundant calls to {@link #shouldFire}
- * because they all must be ready to fire.
- */
- @Override
- public void onOnlyFiring(TriggerContext context) throws Exception {
- for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
- subtrigger.invokeOnFire(context);
- }
- }
-
@Override
public String toString() {
StringBuilder builder = new StringBuilder("AfterAll.of(");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
index 6078b34..9daecb2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
@@ -18,11 +18,9 @@
package org.apache.beam.sdk.transforms.windowing;
import com.google.common.collect.ImmutableList;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
-import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.transforms.Combine;
@@ -31,10 +29,6 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
-import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateMerging;
import org.apache.beam.sdk.util.state.StateTag;
import org.apache.beam.sdk.util.state.StateTags;
import org.joda.time.Duration;
@@ -62,12 +56,6 @@ public abstract class AfterDelayFromFirstElement extends OnceTrigger {
private static final PeriodFormatter PERIOD_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH);
/**
- * To complete an implementation, return the desired time from the TriggerContext.
- */
- @Nullable
- public abstract Instant getCurrentTime(Trigger.TriggerContext context);
-
- /**
* To complete an implementation, return a new instance like this one, but incorporating
* the provided timestamp mapping functions. Generally should be used by calling the
* constructor of this class from the constructor of the subclass.
@@ -92,10 +80,6 @@ public abstract class AfterDelayFromFirstElement extends OnceTrigger {
this.timeDomain = timeDomain;
}
- private Instant getTargetTimestamp(OnElementContext c) {
- return computeTargetTimestamp(c.currentProcessingTime());
- }
-
/**
* The time domain according for which this trigger sets timers.
*/
@@ -170,93 +154,10 @@ public abstract class AfterDelayFromFirstElement extends OnceTrigger {
}
@Override
- @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification =
- "prefetch side effect")
- public void prefetchOnElement(StateAccessor<?> state) {
- state.access(DELAYED_UNTIL_TAG).readLater();
- }
-
- @Override
- public void onElement(OnElementContext c) throws Exception {
- CombiningState<Instant, Instant> delayUntilState = c.state().access(DELAYED_UNTIL_TAG);
- Instant oldDelayUntil = delayUntilState.read();
-
- // Since processing time can only advance, resulting in target wake-up times we would
- // ignore anyhow, we don't bother with it if it is already set.
- if (oldDelayUntil != null) {
- return;
- }
-
- Instant targetTimestamp = getTargetTimestamp(c);
- delayUntilState.add(targetTimestamp);
- c.setTimer(targetTimestamp, timeDomain);
- }
-
- @Override
- public void prefetchOnMerge(MergingStateAccessor<?, ?> state) {
- super.prefetchOnMerge(state);
- StateMerging.prefetchCombiningValues(state, DELAYED_UNTIL_TAG);
- }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception {
- // NOTE: We could try to delete all timers which are still active, but we would
- // need access to a timer context for each merging window.
- // for (CombiningValueStateInternal<Instant, Combine.Holder<Instant>, Instant> state :
- // c.state().accessInEachMergingWindow(DELAYED_UNTIL_TAG).values()) {
- // Instant timestamp = state.get().read();
- // if (timestamp != null) {
- // <context for merging window>.deleteTimer(timestamp, timeDomain);
- // }
- // }
- // Instead let them fire and be ignored.
-
- // If the trigger is already finished, there is no way it will become re-activated
- if (c.trigger().isFinished()) {
- StateMerging.clear(c.state(), DELAYED_UNTIL_TAG);
- // NOTE: We do not attempt to delete the timers.
- return;
- }
-
- // Determine the earliest point across all the windows, and delay to that.
- StateMerging.mergeCombiningValues(c.state(), DELAYED_UNTIL_TAG);
-
- Instant earliestTargetTime = c.state().access(DELAYED_UNTIL_TAG).read();
- if (earliestTargetTime != null) {
- c.setTimer(earliestTargetTime, timeDomain);
- }
- }
-
- @Override
- @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification =
- "prefetch side effect")
- public void prefetchShouldFire(StateAccessor<?> state) {
- state.access(DELAYED_UNTIL_TAG).readLater();
- }
-
- @Override
- public void clear(TriggerContext c) throws Exception {
- c.state().access(DELAYED_UNTIL_TAG).clear();
- }
-
- @Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
return BoundedWindow.TIMESTAMP_MAX_VALUE;
}
- @Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- Instant delayedUntil = context.state().access(DELAYED_UNTIL_TAG).read();
- return delayedUntil != null
- && getCurrentTime(context) != null
- && getCurrentTime(context).isAfter(delayedUntil);
- }
-
- @Override
- protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception {
- clear(context);
- }
-
protected Instant computeTargetTimestamp(Instant time) {
Instant result = time;
for (SerializableFunction<Instant, Instant> timestampMapper : timestampMappers) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
index 961d97f..872ad46 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
@@ -23,7 +23,6 @@ import com.google.common.base.Joiner;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.util.ExecutableTrigger;
import org.joda.time.Instant;
/**
@@ -59,41 +58,6 @@ public class AfterEach extends Trigger {
}
@Override
- public void onElement(OnElementContext c) throws Exception {
- if (!c.trigger().isMerging()) {
- // If merges are not possible, we need only run the first unfinished subtrigger
- c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c);
- } else {
- // If merges are possible, we need to run all subtriggers in parallel
- for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
- // Even if the subTrigger is done, it may be revived via merging and must have
- // adequate state.
- subTrigger.invokeOnElement(c);
- }
- }
- }
-
- @Override
- public void onMerge(OnMergeContext context) throws Exception {
- // If merging makes a subtrigger no-longer-finished, it will automatically
- // begin participating in shouldFire and onFire appropriately.
-
- // All the following triggers are retroactively "not started" but that is
- // also automatic because they are cleared whenever this trigger
- // fires.
- boolean priorTriggersAllFinished = true;
- for (ExecutableTrigger subTrigger : context.trigger().subTriggers()) {
- if (priorTriggersAllFinished) {
- subTrigger.invokeOnMerge(context);
- priorTriggersAllFinished &= context.forTrigger(subTrigger).trigger().isFinished();
- } else {
- subTrigger.invokeClear(context);
- }
- }
- updateFinishedState(context);
- }
-
- @Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
// This trigger will fire at least once when the first trigger in the sequence
// fires at least once.
@@ -106,27 +70,6 @@ public class AfterEach extends Trigger {
}
@Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- ExecutableTrigger firstUnfinished = context.trigger().firstUnfinishedSubTrigger();
- return firstUnfinished.invokeShouldFire(context);
- }
-
- @Override
- public void onFire(Trigger.TriggerContext context) throws Exception {
- context.trigger().firstUnfinishedSubTrigger().invokeOnFire(context);
-
- // Reset all subtriggers if in a merging context; any may be revived by merging so they are
- // all run in parallel for each pending pane.
- if (context.trigger().isMerging()) {
- for (ExecutableTrigger subTrigger : context.trigger().subTriggers()) {
- subTrigger.invokeClear(context);
- }
- }
-
- updateFinishedState(context);
- }
-
- @Override
public String toString() {
StringBuilder builder = new StringBuilder("AfterEach.inOrder(");
Joiner.on(", ").appendTo(builder, subTriggers);
@@ -134,8 +77,4 @@ public class AfterEach extends Trigger {
return builder.toString();
}
-
- private void updateFinishedState(TriggerContext context) {
- context.trigger().setFinished(context.trigger().firstUnfinishedSubTrigger() == null);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
index 7840fc4..a742b43 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
@@ -24,7 +24,6 @@ import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.ExecutableTrigger;
import org.joda.time.Instant;
/**
@@ -47,21 +46,6 @@ public class AfterFirst extends OnceTrigger {
}
@Override
- public void onElement(OnElementContext c) throws Exception {
- for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
- subTrigger.invokeOnElement(c);
- }
- }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception {
- for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
- subTrigger.invokeOnMerge(c);
- }
- updateFinishedStatus(c);
- }
-
- @Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
// This trigger will fire after the earliest of its sub-triggers.
Instant deadline = BoundedWindow.TIMESTAMP_MAX_VALUE;
@@ -80,32 +64,6 @@ public class AfterFirst extends OnceTrigger {
}
@Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
- if (context.forTrigger(subtrigger).trigger().isFinished()
- || subtrigger.invokeShouldFire(context)) {
- return true;
- }
- }
- return false;
- }
-
- @Override
- protected void onOnlyFiring(TriggerContext context) throws Exception {
- for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
- TriggerContext subContext = context.forTrigger(subtrigger);
- if (subtrigger.invokeShouldFire(subContext)) {
- // If the trigger is ready to fire, then do whatever it needs to do.
- subtrigger.invokeOnFire(subContext);
- } else {
- // If the trigger is not ready to fire, it is nonetheless true that whatever
- // pending pane it was tracking is now gone.
- subtrigger.invokeClear(subContext);
- }
- }
- }
-
- @Override
public String toString() {
StringBuilder builder = new StringBuilder("AfterFirst.of(");
Joiner.on(", ").appendTo(builder, subTriggers);
@@ -113,12 +71,4 @@ public class AfterFirst extends OnceTrigger {
return builder.toString();
}
-
- private void updateFinishedStatus(TriggerContext c) {
- boolean anyFinished = false;
- for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
- anyFinished |= c.forTrigger(subTrigger).trigger().isFinished();
- }
- c.trigger().setFinished(anyFinished);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
index 4d59d58..4a706e6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.transforms.windowing;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.List;
import java.util.Objects;
import org.apache.beam.sdk.annotations.Experimental;
@@ -25,9 +24,6 @@ import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateMerging;
import org.apache.beam.sdk.util.state.StateTag;
import org.apache.beam.sdk.util.state.StateTags;
import org.joda.time.Instant;
@@ -65,49 +61,6 @@ private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Lo
}
@Override
- public void onElement(OnElementContext c) throws Exception {
- c.state().access(ELEMENTS_IN_PANE_TAG).add(1L);
- }
-
- @Override
- public void prefetchOnMerge(MergingStateAccessor<?, ?> state) {
- super.prefetchOnMerge(state);
- StateMerging.prefetchCombiningValues(state, ELEMENTS_IN_PANE_TAG);
- }
-
- @Override
- public void onMerge(OnMergeContext context) throws Exception {
- // If we've already received enough elements and finished in some window,
- // then this trigger is just finished.
- if (context.trigger().finishedInAnyMergingWindow()) {
- context.trigger().setFinished(true);
- StateMerging.clear(context.state(), ELEMENTS_IN_PANE_TAG);
- return;
- }
-
- // Otherwise, compute the sum of elements in all the active panes.
- StateMerging.mergeCombiningValues(context.state(), ELEMENTS_IN_PANE_TAG);
- }
-
- @Override
- @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification =
- "prefetch side effect")
- public void prefetchShouldFire(StateAccessor<?> state) {
- state.access(ELEMENTS_IN_PANE_TAG).readLater();
- }
-
- @Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- long count = context.state().access(ELEMENTS_IN_PANE_TAG).read();
- return count >= countElems;
- }
-
- @Override
- public void clear(TriggerContext c) throws Exception {
- c.state().access(ELEMENTS_IN_PANE_TAG).clear();
- }
-
- @Override
public boolean isCompatible(Trigger other) {
return this.equals(other);
}
@@ -143,9 +96,4 @@ private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Lo
public int hashCode() {
return Objects.hash(countElems);
}
-
- @Override
- protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception {
- clear(context);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
index f551118..09f288e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.transforms.windowing;
import java.util.List;
import java.util.Objects;
-import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.TimeDomain;
@@ -36,12 +35,6 @@ import org.joda.time.Instant;
@Experimental(Experimental.Kind.TRIGGER)
public class AfterProcessingTime extends AfterDelayFromFirstElement {
- @Override
- @Nullable
- public Instant getCurrentTime(Trigger.TriggerContext context) {
- return context.currentProcessingTime();
- }
-
private AfterProcessingTime(List<SerializableFunction<Instant, Instant>> transforms) {
super(TimeDomain.PROCESSING_TIME, transforms);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
index b96b293..b6258f8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.transforms.windowing;
import com.google.common.base.Objects;
import java.util.Collections;
import java.util.List;
-import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.TimeDomain;
import org.joda.time.Instant;
@@ -31,12 +30,6 @@ import org.joda.time.Instant;
*/
public class AfterSynchronizedProcessingTime extends AfterDelayFromFirstElement {
- @Override
- @Nullable
- public Instant getCurrentTime(Trigger.TriggerContext context) {
- return context.currentSynchronizedProcessingTime();
- }
-
public AfterSynchronizedProcessingTime() {
super(TimeDomain.SYNCHRONIZED_PROCESSING_TIME,
Collections.<SerializableFunction<Instant, Instant>>emptyList());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
index 89c1ba9..37b73a6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
@@ -25,7 +25,6 @@ import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.ExecutableTrigger;
import org.apache.beam.sdk.util.TimeDomain;
import org.joda.time.Instant;
@@ -111,50 +110,6 @@ public class AfterWatermark {
}
@Override
- public void onElement(OnElementContext c) throws Exception {
- if (!c.trigger().isMerging()) {
- // If merges can never happen, we just run the unfinished subtrigger
- c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c);
- } else {
- // If merges can happen, we run for all subtriggers because they might be
- // de-activated or re-activated
- for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
- subTrigger.invokeOnElement(c);
- }
- }
- }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception {
- // NOTE that the ReduceFnRunner will delete all end-of-window timers for the
- // merged-away windows.
-
- ExecutableTrigger earlySubtrigger = c.trigger().subTrigger(EARLY_INDEX);
- // We check the early trigger to determine if we are still processing it or
- // if the end of window has transitioned us to the late trigger
- OnMergeContext earlyContext = c.forTrigger(earlySubtrigger);
-
- // If the early trigger is still active in any merging window then it is still active in
- // the new merged window, because even if the merged window is "done" some pending elements
- // haven't had a chance to fire.
- if (!earlyContext.trigger().finishedInAllMergingWindows() || !endOfWindowReached(c)) {
- earlyContext.trigger().setFinished(false);
- if (lateTrigger != null) {
- ExecutableTrigger lateSubtrigger = c.trigger().subTrigger(LATE_INDEX);
- OnMergeContext lateContext = c.forTrigger(lateSubtrigger);
- lateContext.trigger().setFinished(false);
- lateSubtrigger.invokeClear(lateContext);
- }
- } else {
- // Otherwise the early trigger and end-of-window bit is done for good.
- earlyContext.trigger().setFinished(true);
- if (lateTrigger != null) {
- c.trigger().subTrigger(LATE_INDEX).invokeOnMerge(c);
- }
- }
- }
-
- @Override
public Trigger getContinuationTrigger() {
return new AfterWatermarkEarlyAndLate(
earlyTrigger.getContinuationTrigger(),
@@ -173,38 +128,6 @@ public class AfterWatermark {
return window.maxTimestamp();
}
- private boolean endOfWindowReached(Trigger.TriggerContext context) {
- return context.currentEventTime() != null
- && context.currentEventTime().isAfter(context.window().maxTimestamp());
- }
-
- @Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- if (!context.trigger().isFinished(EARLY_INDEX)) {
- // We have not yet transitioned to late firings.
- // We should fire if either the trigger is ready or we reach the end of the window.
- return context.trigger().subTrigger(EARLY_INDEX).invokeShouldFire(context)
- || endOfWindowReached(context);
- } else if (lateTrigger == null) {
- return false;
- } else {
- // We are running the late trigger
- return context.trigger().subTrigger(LATE_INDEX).invokeShouldFire(context);
- }
- }
-
- @Override
- public void onFire(Trigger.TriggerContext context) throws Exception {
- if (!context.forTrigger(context.trigger().subTrigger(EARLY_INDEX)).trigger().isFinished()) {
- onNonLateFiring(context);
- } else if (lateTrigger != null) {
- onLateFiring(context);
- } else {
- // all done
- context.trigger().setFinished(true);
- }
- }
-
@Override
public String toString() {
StringBuilder builder = new StringBuilder(TO_STRING);
@@ -225,47 +148,6 @@ public class AfterWatermark {
return builder.toString();
}
-
- private void onNonLateFiring(Trigger.TriggerContext context) throws Exception {
- // We have not yet transitioned to late firings.
- ExecutableTrigger earlySubtrigger = context.trigger().subTrigger(EARLY_INDEX);
- Trigger.TriggerContext earlyContext = context.forTrigger(earlySubtrigger);
-
- if (!endOfWindowReached(context)) {
- // This is an early firing, since we have not arrived at the end of the window
- // Implicitly repeats
- earlySubtrigger.invokeOnFire(context);
- earlySubtrigger.invokeClear(context);
- earlyContext.trigger().setFinished(false);
- } else {
- // We have arrived at the end of the window; terminate the early trigger
- // and clear out the late trigger's state
- if (earlySubtrigger.invokeShouldFire(context)) {
- earlySubtrigger.invokeOnFire(context);
- }
- earlyContext.trigger().setFinished(true);
- earlySubtrigger.invokeClear(context);
-
- if (lateTrigger == null) {
- // Done if there is no late trigger.
- context.trigger().setFinished(true);
- } else {
- // If there is a late trigger, we transition to it, and need to clear its state
- // because it was run in parallel.
- context.trigger().subTrigger(LATE_INDEX).invokeClear(context);
- }
- }
-
- }
-
- private void onLateFiring(Trigger.TriggerContext context) throws Exception {
- // We are firing the late trigger, with implicit repeat
- ExecutableTrigger lateSubtrigger = context.trigger().subTrigger(LATE_INDEX);
- lateSubtrigger.invokeOnFire(context);
- // It is a OnceTrigger, so it must have finished; unfinished it and clear it
- lateSubtrigger.invokeClear(context);
- context.forTrigger(lateSubtrigger).trigger().setFinished(false);
- }
}
/**
@@ -296,33 +178,6 @@ public class AfterWatermark {
}
@Override
- public void onElement(OnElementContext c) throws Exception {
- // We're interested in knowing when the input watermark passes the end of the window.
- // (It is possible this has already happened, in which case the timer will be fired
- // almost immediately).
- c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
- }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception {
- // NOTE that the ReduceFnRunner will delete all end-of-window timers for the
- // merged-away windows.
-
- if (!c.trigger().finishedInAllMergingWindows()) {
- // If the trigger is still active in any merging window then it is still active in the new
- // merged window, because even if the merged window is "done" some pending elements haven't
- // had a chance to fire
- c.trigger().setFinished(false);
- } else if (!endOfWindowReached(c)) {
- // If the end of the new window has not been reached, then the trigger is active again.
- c.trigger().setFinished(false);
- } else {
- // Otherwise it is done for good
- c.trigger().setFinished(true);
- }
- }
-
- @Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
return window.maxTimestamp();
}
@@ -346,18 +201,5 @@ public class AfterWatermark {
public int hashCode() {
return Objects.hash(getClass());
}
-
- @Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- return endOfWindowReached(context);
- }
-
- private boolean endOfWindowReached(Trigger.TriggerContext context) {
- return context.currentEventTime() != null
- && context.currentEventTime().isAfter(context.window().maxTimestamp());
- }
-
- @Override
- protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception { }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
index fee7cdf..a649b4f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.transforms.windowing;
import java.util.List;
import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.util.TimeDomain;
import org.joda.time.Instant;
/**
@@ -41,27 +40,6 @@ public class DefaultTrigger extends Trigger{
}
@Override
- public void onElement(OnElementContext c) throws Exception {
- // If the end of the window has already been reached, then we are already ready to fire
- // and do not need to set a wake-up timer.
- if (!endOfWindowReached(c)) {
- c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
- }
- }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception {
- // If the end of the window has already been reached, then we are already ready to fire
- // and do not need to set a wake-up timer.
- if (!endOfWindowReached(c)) {
- c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
- }
- }
-
- @Override
- public void clear(TriggerContext c) throws Exception { }
-
- @Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
return window.maxTimestamp();
}
@@ -76,17 +54,4 @@ public class DefaultTrigger extends Trigger{
public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
return this;
}
-
- @Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- return endOfWindowReached(context);
- }
-
- private boolean endOfWindowReached(Trigger.TriggerContext context) {
- return context.currentEventTime() != null
- && context.currentEventTime().isAfter(context.window().maxTimestamp());
- }
-
- @Override
- public void onFire(Trigger.TriggerContext context) throws Exception { }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
index 07b70f4..664ae83 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
@@ -48,12 +48,6 @@ public final class Never {
}
@Override
- public void onElement(OnElementContext c) {}
-
- @Override
- public void onMerge(OnMergeContext c) {}
-
- @Override
protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
return this;
}
@@ -62,16 +56,5 @@ public final class Never {
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
return BoundedWindow.TIMESTAMP_MAX_VALUE;
}
-
- @Override
- public boolean shouldFire(Trigger.TriggerContext context) {
- return false;
- }
-
- @Override
- protected void onOnlyFiring(Trigger.TriggerContext context) {
- throw new UnsupportedOperationException(
- String.format("%s should never fire", getClass().getSimpleName()));
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
index 9bef45a..1ed9b55 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.transforms.windowing;
import com.google.common.annotations.VisibleForTesting;
import java.util.Arrays;
import java.util.List;
-import org.apache.beam.sdk.util.ExecutableTrigger;
import org.joda.time.Instant;
/**
@@ -51,20 +50,6 @@ public class OrFinallyTrigger extends Trigger {
}
@Override
- public void onElement(OnElementContext c) throws Exception {
- c.trigger().subTrigger(ACTUAL).invokeOnElement(c);
- c.trigger().subTrigger(UNTIL).invokeOnElement(c);
- }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception {
- for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
- subTrigger.invokeOnMerge(c);
- }
- updateFinishedState(c);
- }
-
- @Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
// This trigger fires once either the trigger or the until trigger fires.
Instant actualDeadline = subTriggers.get(ACTUAL).getWatermarkThatGuaranteesFiring(window);
@@ -83,38 +68,7 @@ public class OrFinallyTrigger extends Trigger {
}
@Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- return context.trigger().subTrigger(ACTUAL).invokeShouldFire(context)
- || context.trigger().subTrigger(UNTIL).invokeShouldFire(context);
- }
-
- @Override
- public void onFire(Trigger.TriggerContext context) throws Exception {
- ExecutableTrigger actualSubtrigger = context.trigger().subTrigger(ACTUAL);
- ExecutableTrigger untilSubtrigger = context.trigger().subTrigger(UNTIL);
-
- if (untilSubtrigger.invokeShouldFire(context)) {
- untilSubtrigger.invokeOnFire(context);
- actualSubtrigger.invokeClear(context);
- } else {
- // If until didn't fire, then the actual must have (or it is forbidden to call
- // onFire) so we are done only if actual is done.
- actualSubtrigger.invokeOnFire(context);
- // Do not clear the until trigger, because it tracks data cross firings.
- }
- updateFinishedState(context);
- }
-
- @Override
public String toString() {
return String.format("%s.orFinally(%s)", subTriggers.get(ACTUAL), subTriggers.get(UNTIL));
}
-
- private void updateFinishedState(TriggerContext c) throws Exception {
- boolean anyStillFinished = false;
- for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
- anyStillFinished |= c.forTrigger(subTrigger).trigger().isFinished();
- }
- c.trigger().setFinished(anyStillFinished);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
index 45bc6c1..4d79a2c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.transforms.windowing;
import java.util.Arrays;
import java.util.List;
-import org.apache.beam.sdk.util.ExecutableTrigger;
import org.joda.time.Instant;
/**
@@ -61,16 +60,6 @@ public class Repeatedly extends Trigger {
}
@Override
- public void onElement(OnElementContext c) throws Exception {
- getRepeated(c).invokeOnElement(c);
- }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception {
- getRepeated(c).invokeOnMerge(c);
- }
-
- @Override
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
// This trigger fires once the repeated trigger fires.
return subTriggers.get(REPEATED).getWatermarkThatGuaranteesFiring(window);
@@ -82,26 +71,7 @@ public class Repeatedly extends Trigger {
}
@Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- return getRepeated(context).invokeShouldFire(context);
- }
-
- @Override
- public void onFire(TriggerContext context) throws Exception {
- getRepeated(context).invokeOnFire(context);
-
- if (context.trigger().isFinished(REPEATED)) {
- // Reset tree will recursively clear the finished bits, and invoke clear.
- context.forTrigger(getRepeated(context)).trigger().resetTree();
- }
- }
-
- @Override
public String toString() {
return String.format("Repeatedly.forever(%s)", subTriggers.get(REPEATED));
}
-
- private ExecutableTrigger getRepeated(TriggerContext context) {
- return context.trigger().subTrigger(REPEATED);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
index 18b7a62..1cc807e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
@@ -23,22 +23,18 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
-import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.util.ExecutableTrigger;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.transforms.GroupByKey;
import org.joda.time.Instant;
/**
- * {@code Trigger}s control when the elements for a specific key and window are output. As elements
- * arrive, they are put into one or more windows by a {@link Window} transform and its associated
- * {@link WindowFn}, and then passed to the associated {@code Trigger} to determine if the
- * {@code Window}s contents should be output.
+ * {@link Trigger Triggers} control when the elements for a specific key and window are output. As
+ * elements arrive, they are put into one or more windows by a {@link Window} transform and its
+ * associated {@link WindowFn}, and then passed to the associated {@link Trigger} to determine if
+ * the {@link BoundedWindow Window's} contents should be output.
*
- * <p>See {@link org.apache.beam.sdk.transforms.GroupByKey} and {@link Window}
- * for more information about how grouping with windows works.
+ * <p>See {@link GroupByKey} and {@link Window} for more information about how grouping with windows
+ * works.
*
* <p>The elements that are assigned to a window since the last time it was fired (or since the
* window was created) are placed into the current window pane. Triggers are evaluated against the
@@ -46,224 +42,34 @@ import org.joda.time.Instant;
* output. When the root trigger finishes (indicating it will never fire again), the window is
* closed and any new elements assigned to that window are discarded.
*
- * <p>Several predefined {@code Trigger}s are provided:
+ * <p>Several predefined {@link Trigger Triggers} are provided:
+ *
* <ul>
- * <li> {@link AfterWatermark} for firing when the watermark passes a timestamp determined from
- * either the end of the window or the arrival of the first element in a pane.
- * <li> {@link AfterProcessingTime} for firing after some amount of processing time has elapsed
- * (typically since the first element in a pane).
- * <li> {@link AfterPane} for firing off a property of the elements in the current pane, such as
- * the number of elements that have been assigned to the current pane.
+ * <li> {@link AfterWatermark} for firing when the watermark passes a timestamp determined from
+ * either the end of the window or the arrival of the first element in a pane.
+ * <li> {@link AfterProcessingTime} for firing after some amount of processing time has elapsed
+ * (typically since the first element in a pane).
+ * <li> {@link AfterPane} for firing off a property of the elements in the current pane, such as the
+ * number of elements that have been assigned to the current pane.
* </ul>
*
* <p>In addition, {@code Trigger}s can be combined in a variety of ways:
- * <ul>
- * <li> {@link Repeatedly#forever} to create a trigger that executes forever. Any time its
- * argument finishes it gets reset and starts over. Can be combined with
- * {@link Trigger#orFinally} to specify a condition that causes the repetition to stop.
- * <li> {@link AfterEach#inOrder} to execute each trigger in sequence, firing each (and every)
- * time that a trigger fires, and advancing to the next trigger in the sequence when it finishes.
- * <li> {@link AfterFirst#of} to create a trigger that fires after at least one of its arguments
- * fires. An {@link AfterFirst} trigger finishes after it fires once.
- * <li> {@link AfterAll#of} to create a trigger that fires after all least one of its arguments
- * have fired at least once. An {@link AfterAll} trigger finishes after it fires once.
- * </ul>
*
- * <p>Each trigger tree is instantiated per-key and per-window. Every trigger in the tree is in one
- * of the following states:
* <ul>
- * <li> Never Existed - before the trigger has started executing, there is no state associated
- * with it anywhere in the system. A trigger moves to the executing state as soon as it
- * processes in the current pane.
- * <li> Executing - while the trigger is receiving items and may fire. While it is in this state,
- * it may persist book-keeping information to persisted state, set timers, etc.
- * <li> Finished - after a trigger finishes, all of its book-keeping data is cleaned up, and the
- * system remembers only that it is finished. Entering this state causes us to discard any
- * elements in the buffer for that window, as well.
+ * <li> {@link Repeatedly#forever} to create a trigger that executes forever. Any time its argument
+ * finishes it gets reset and starts over. Can be combined with {@link Trigger#orFinally} to
+ * specify a condition that causes the repetition to stop.
+ * <li> {@link AfterEach#inOrder} to execute each trigger in sequence, firing each (and every) time
+ * that a trigger fires, and advancing to the next trigger in the sequence when it finishes.
+ * <li> {@link AfterFirst#of} to create a trigger that fires after at least one of its arguments
+ * fires. An {@link AfterFirst} trigger finishes after it fires once.
+ * <li> {@link AfterAll#of} to create a trigger that fires after all least one of its arguments have
+ * fired at least once. An {@link AfterAll} trigger finishes after it fires once.
* </ul>
- *
- * <p>Once finished, a trigger cannot return itself back to an earlier state, however a composite
- * trigger could reset its sub-triggers.
- *
- * <p>Triggers should not build up any state internally since they may be recreated
- * between invocations of the callbacks. All important values should be persisted using
- * state before the callback returns.
*/
@Experimental(Experimental.Kind.TRIGGER)
public abstract class Trigger implements Serializable {
- /**
- * Interface for accessing information about the trigger being executed and other triggers in the
- * same tree.
- */
- public interface TriggerInfo {
-
- /**
- * Returns true if the windowing strategy of the current {@code PCollection} is a merging
- * WindowFn. If true, the trigger execution needs to keep enough information to support the
- * possibility of {@link Trigger#onMerge} being called. If false, {@link Trigger#onMerge} will
- * never be called.
- */
- boolean isMerging();
-
- /**
- * Access the executable versions of the sub-triggers of the current trigger.
- */
- Iterable<ExecutableTrigger> subTriggers();
-
- /**
- * Access the executable version of the specified sub-trigger.
- */
- ExecutableTrigger subTrigger(int subtriggerIndex);
-
- /**
- * Returns true if the current trigger is marked finished.
- */
- boolean isFinished();
-
- /**
- * Return true if the given subtrigger is marked finished.
- */
- boolean isFinished(int subtriggerIndex);
-
- /**
- * Returns true if all the sub-triggers of the current trigger are marked finished.
- */
- boolean areAllSubtriggersFinished();
-
- /**
- * Returns an iterable over the unfinished sub-triggers of the current trigger.
- */
- Iterable<ExecutableTrigger> unfinishedSubTriggers();
-
- /**
- * Returns the first unfinished sub-trigger.
- */
- ExecutableTrigger firstUnfinishedSubTrigger();
-
- /**
- * Clears all keyed state for triggers in the current sub-tree and unsets all the associated
- * finished bits.
- */
- void resetTree() throws Exception;
-
- /**
- * Sets the finished bit for the current trigger.
- */
- void setFinished(boolean finished);
-
- /**
- * Sets the finished bit for the given sub-trigger.
- */
- void setFinished(boolean finished, int subTriggerIndex);
- }
-
- /**
- * Interact with properties of the trigger being executed, with extensions to deal with the
- * merging windows.
- */
- public interface MergingTriggerInfo extends TriggerInfo {
-
- /** Return true if the trigger is finished in any window being merged. */
- boolean finishedInAnyMergingWindow();
-
- /** Return true if the trigger is finished in all windows being merged. */
- boolean finishedInAllMergingWindows();
- }
-
- /**
- * Information accessible to all operational hooks in this {@code Trigger}.
- *
- * <p>Used directly in {@link Trigger#shouldFire} and {@link Trigger#clear}, and
- * extended with additional information in other methods.
- */
- public abstract class TriggerContext {
-
- /** Returns the interface for accessing trigger info. */
- public abstract TriggerInfo trigger();
-
- /** Returns the interface for accessing persistent state. */
- public abstract StateAccessor<?> state();
-
- /** The window that the current context is executing in. */
- public abstract BoundedWindow window();
-
- /** Create a sub-context for the given sub-trigger. */
- public abstract TriggerContext forTrigger(ExecutableTrigger trigger);
-
- /**
- * Removes the timer set in this trigger context for the given {@link Instant}
- * and {@link TimeDomain}.
- */
- public abstract void deleteTimer(Instant timestamp, TimeDomain domain);
-
- /** The current processing time. */
- public abstract Instant currentProcessingTime();
-
- /** The current synchronized upstream processing time or {@code null} if unknown. */
- @Nullable
- public abstract Instant currentSynchronizedProcessingTime();
-
- /** The current event time for the input or {@code null} if unknown. */
- @Nullable
- public abstract Instant currentEventTime();
- }
-
- /**
- * Extended {@link TriggerContext} containing information accessible to the {@link #onElement}
- * operational hook.
- */
- public abstract class OnElementContext extends TriggerContext {
- /** The event timestamp of the element currently being processed. */
- public abstract Instant eventTimestamp();
-
- /**
- * Sets a timer to fire when the watermark or processing time is beyond the given timestamp.
- * Timers are not guaranteed to fire immediately, but will be delivered at some time afterwards.
- *
- * <p>As with {@link #state}, timers are implicitly scoped to the current window. All
- * timer firings for a window will be received, but the implementation should choose to ignore
- * those that are not applicable.
- *
- * @param timestamp the time at which the trigger should be re-evaluated
- * @param domain the domain that the {@code timestamp} applies to
- */
- public abstract void setTimer(Instant timestamp, TimeDomain domain);
-
- /** Create an {@code OnElementContext} for executing the given trigger. */
- @Override
- public abstract OnElementContext forTrigger(ExecutableTrigger trigger);
- }
-
- /**
- * Extended {@link TriggerContext} containing information accessible to the {@link #onMerge}
- * operational hook.
- */
- public abstract class OnMergeContext extends TriggerContext {
- /**
- * Sets a timer to fire when the watermark or processing time is beyond the given timestamp.
- * Timers are not guaranteed to fire immediately, but will be delivered at some time afterwards.
- *
- * <p>As with {@link #state}, timers are implicitly scoped to the current window. All
- * timer firings for a window will be received, but the implementation should choose to ignore
- * those that are not applicable.
- *
- * @param timestamp the time at which the trigger should be re-evaluated
- * @param domain the domain that the {@code timestamp} applies to
- */
- public abstract void setTimer(Instant timestamp, TimeDomain domain);
-
- /** Create an {@code OnMergeContext} for executing the given trigger. */
- @Override
- public abstract OnMergeContext forTrigger(ExecutableTrigger trigger);
-
- @Override
- public abstract MergingStateAccessor<?, ?> state();
-
- @Override
- public abstract MergingTriggerInfo trigger();
- }
-
protected final List<Trigger> subTriggers;
protected Trigger(List<Trigger> subTriggers) {
@@ -274,114 +80,14 @@ public abstract class Trigger implements Serializable {
this(Collections.EMPTY_LIST);
}
- /**
- * Called every time an element is incorporated into a window.
- */
- public abstract void onElement(OnElementContext c) throws Exception;
-
- /**
- * Called immediately after windows have been merged.
- *
- * <p>Leaf triggers should update their state by inspecting their status and any state
- * in the merging windows. Composite triggers should update their state by calling
- * {@link ExecutableTrigger#invokeOnMerge} on their sub-triggers, and applying appropriate logic.
- *
- * <p>A trigger such as {@link AfterWatermark#pastEndOfWindow} may no longer be finished;
- * it is the responsibility of the trigger itself to record this fact. It is forbidden for
- * a trigger to become finished due to {@link #onMerge}, as it has not yet fired the pending
- * elements that led to it being ready to fire.
- *
- * <p>The implementation does not need to clear out any state associated with the old windows.
- */
- public abstract void onMerge(OnMergeContext c) throws Exception;
-
- /**
- * Returns {@code true} if the current state of the trigger indicates that its condition
- * is satisfied and it is ready to fire.
- */
- public abstract boolean shouldFire(TriggerContext context) throws Exception;
-
- /**
- * Adjusts the state of the trigger to be ready for the next pane. For example, a
- * {@link Repeatedly} trigger will reset its inner trigger, since it has fired.
- *
- * <p>If the trigger is finished, it is the responsibility of the trigger itself to
- * record that fact via the {@code context}.
- */
- public abstract void onFire(TriggerContext context) throws Exception;
-
- /**
- * Called to allow the trigger to prefetch any state it will likely need to read from during
- * an {@link #onElement} call.
- */
- public void prefetchOnElement(StateAccessor<?> state) {
- if (subTriggers != null) {
- for (Trigger trigger : subTriggers) {
- trigger.prefetchOnElement(state);
- }
- }
- }
-
- /**
- * Called to allow the trigger to prefetch any state it will likely need to read from during
- * an {@link #onMerge} call.
- */
- public void prefetchOnMerge(MergingStateAccessor<?, ?> state) {
- if (subTriggers != null) {
- for (Trigger trigger : subTriggers) {
- trigger.prefetchOnMerge(state);
- }
- }
- }
-
- /**
- * Called to allow the trigger to prefetch any state it will likely need to read from during
- * an {@link #shouldFire} call.
- */
- public void prefetchShouldFire(StateAccessor<?> state) {
- if (subTriggers != null) {
- for (Trigger trigger : subTriggers) {
- trigger.prefetchShouldFire(state);
- }
- }
- }
-
- /**
- * Called to allow the trigger to prefetch any state it will likely need to read from during
- * an {@link #onFire} call.
- */
- public void prefetchOnFire(StateAccessor<?> state) {
- if (subTriggers != null) {
- for (Trigger trigger : subTriggers) {
- trigger.prefetchOnFire(state);
- }
- }
- }
-
- /**
- * Clear any state associated with this trigger in the given window.
- *
- * <p>This is called after a trigger has indicated it will never fire again. The trigger system
- * keeps enough information to know that the trigger is finished, so this trigger should clear all
- * of its state.
- */
- public void clear(TriggerContext c) throws Exception {
- if (subTriggers != null) {
- for (ExecutableTrigger trigger : c.trigger().subTriggers()) {
- trigger.invokeClear(c);
- }
- }
- }
-
public List<Trigger> subTriggers() {
return subTriggers;
}
/**
- * Return a trigger to use after a {@code GroupByKey} to preserve the
- * intention of this trigger. Specifically, triggers that are time based
- * and intended to provide speculative results should continue providing
- * speculative results. Triggers that fire once (or multiple times) should
+ * Return a trigger to use after a {@link GroupByKey} to preserve the intention of this trigger.
+ * Specifically, triggers that are time based and intended to provide speculative results should
+ * continue providing speculative results. Triggers that fire once (or multiple times) should
* continue firing once (or multiple times).
*/
public Trigger getContinuationTrigger() {
@@ -397,27 +103,24 @@ public abstract class Trigger implements Serializable {
}
/**
- * Return the {@link #getContinuationTrigger} of this {@code Trigger}. For convenience, this
- * is provided the continuation trigger of each of the sub-triggers.
+ * Return the {@link #getContinuationTrigger} of this {@link Trigger}. For convenience, this is
+ * provided the continuation trigger of each of the sub-triggers.
*/
protected abstract Trigger getContinuationTrigger(List<Trigger> continuationTriggers);
/**
- * Returns a bound in watermark time by which this trigger would have fired at least once
- * for a given window had there been input data. This is a static property of a trigger
- * that does not depend on its state.
+ * Returns a bound in event time by which this trigger would have fired at least once for a given
+ * window had there been input data.
*
- * <p>For triggers that do not fire based on the watermark advancing, returns
- * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
+ * <p>For triggers that do not fire based on the watermark advancing, returns {@link
+ * BoundedWindow#TIMESTAMP_MAX_VALUE}.
*
- * <p>This estimate is used to determine that there are no elements in a side-input window, which
- * causes the default value to be used instead.
+ * <p>This estimate may be used, for example, to determine that there are no elements in a
+ * side-input window, which causes the default value to be used instead.
*/
public abstract Instant getWatermarkThatGuaranteesFiring(BoundedWindow window);
- /**
- * Returns whether this performs the same triggering as the given {@code Trigger}.
- */
+ /** Returns whether this performs the same triggering as the given {@link Trigger}. */
public boolean isCompatible(Trigger other) {
if (!getClass().equals(other.getClass())) {
return false;
@@ -472,31 +175,33 @@ public abstract class Trigger implements Serializable {
}
/**
- * Specify an ending condition for this trigger. If the {@code until} fires then the combination
- * fires.
+ * Specify an ending condition for this trigger. If the {@code until} {@link Trigger} fires then
+ * the combination fires.
*
- * <p>The expression {@code t1.orFinally(t2)} fires every time {@code t1} fires, and finishes
- * as soon as either {@code t1} finishes or {@code t2} fires, in which case it fires one last time
- * for {@code t2}. Both {@code t1} and {@code t2} are executed in parallel. This means that
- * {@code t1} may have fired since {@code t2} started, so not all of the elements that {@code t2}
- * has seen are necessarily in the current pane.
+ * <p>The expression {@code t1.orFinally(t2)} fires every time {@code t1} fires, and finishes as
+ * soon as either {@code t1} finishes or {@code t2} fires, in which case it fires one last time
+ * for {@code t2}. Both {@code t1} and {@code t2} are executed in parallel. This means that {@code
+ * t1} may have fired since {@code t2} started, so not all of the elements that {@code t2} has
+ * seen are necessarily in the current pane.
*
* <p>For example the final firing of the following trigger may only have 1 element:
- * <pre> {@code
+ *
+ * <pre>{@code
* Repeatedly.forever(AfterPane.elementCountAtLeast(2))
* .orFinally(AfterPane.elementCountAtLeast(5))
- * } </pre>
+ * }
+ * </pre>
*
- * <p>Note that if {@code t1} is {@link OnceTrigger}, then {@code t1.orFinally(t2)} is the same
- * as {@code AfterFirst.of(t1, t2)}.
+ * <p>Note that if {@code t1} is {@link OnceTrigger}, then {@code t1.orFinally(t2)} is the same as
+ * {@code AfterFirst.of(t1, t2)}.
*/
public OrFinallyTrigger orFinally(OnceTrigger until) {
return new OrFinallyTrigger(this, until);
}
/**
- * {@link Trigger}s that are guaranteed to fire at most once should extend from this, rather
- * than the general {@link Trigger} class to indicate that behavior.
+ * {@link Trigger Triggers} that are guaranteed to fire at most once should extend {@link
+ * OnceTrigger} rather than the general {@link Trigger} class to indicate that behavior.
*/
public abstract static class OnceTrigger extends Trigger {
protected OnceTrigger(List<Trigger> subTriggers) {
@@ -511,20 +216,5 @@ public abstract class Trigger implements Serializable {
}
return (OnceTrigger) continuation;
}
-
- /**
- * {@inheritDoc}
- */
- @Override
- public final void onFire(TriggerContext context) throws Exception {
- onOnlyFiring(context);
- context.trigger().setFinished(true);
- }
-
- /**
- * Called exactly once by {@link #onFire} when the trigger is fired. By default,
- * invokes {@link #onFire} on all subtriggers for which {@link #shouldFire} is {@code true}.
- */
- protected abstract void onOnlyFiring(TriggerContext context) throws Exception;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java
index 088c499..48a49aa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java
@@ -29,9 +29,13 @@ import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
/**
* A wrapper around a trigger used during execution. While an actual trigger may appear multiple
- * times (both in the same trigger expression and in other trigger expressions), the
- * {@code ExecutableTrigger} wrapped around them forms a tree (only one occurrence).
+ * times (both in the same trigger expression and in other trigger expressions), the {@code
+ * ExecutableTrigger} wrapped around them forms a tree (only one occurrence).
+ *
+ * @deprecated uses of {@link ExecutableTrigger} should be ported to
+ * org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine.
*/
+@Deprecated
public class ExecutableTrigger implements Serializable {
/** Store the index assigned to this trigger. */
@@ -115,38 +119,6 @@ public class ExecutableTrigger implements Serializable {
}
/**
- * Invoke the {@link Trigger#onElement} method for this trigger, ensuring that the bits are
- * properly updated if the trigger finishes.
- */
- public void invokeOnElement(Trigger.OnElementContext c) throws Exception {
- trigger.onElement(c.forTrigger(this));
- }
-
- /**
- * Invoke the {@link Trigger#onMerge} method for this trigger, ensuring that the bits are properly
- * updated.
- */
- public void invokeOnMerge(Trigger.OnMergeContext c) throws Exception {
- Trigger.OnMergeContext subContext = c.forTrigger(this);
- trigger.onMerge(subContext);
- }
-
- public boolean invokeShouldFire(Trigger.TriggerContext c) throws Exception {
- return trigger.shouldFire(c.forTrigger(this));
- }
-
- public void invokeOnFire(Trigger.TriggerContext c) throws Exception {
- trigger.onFire(c.forTrigger(this));
- }
-
- /**
- * Invoke clear for the current this trigger.
- */
- public void invokeClear(Trigger.TriggerContext c) throws Exception {
- trigger.clear(c.forTrigger(this));
- }
-
- /**
* {@link ExecutableTrigger} that enforces the fact that the trigger should always FIRE_AND_FINISH
* and never just FIRE.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggers.java
deleted file mode 100644
index ea14c40..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggers.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-/**
- * A mutable set which tracks whether any particular {@link ExecutableTrigger} is
- * finished.
- */
-public interface FinishedTriggers {
- /**
- * Returns {@code true} if the trigger is finished.
- */
- boolean isFinished(ExecutableTrigger trigger);
-
- /**
- * Sets the fact that the trigger is finished.
- */
- void setFinished(ExecutableTrigger trigger, boolean value);
-
- /**
- * Sets the trigger and all of its subtriggers to unfinished.
- */
- void clearRecursively(ExecutableTrigger trigger);
-
- /**
- * Create an independent copy of this mutable {@link FinishedTriggers}.
- */
- FinishedTriggers copy();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersBitSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersBitSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersBitSet.java
deleted file mode 100644
index 4cd617f..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersBitSet.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.util.BitSet;
-
-/**
- * A {@link FinishedTriggers} implementation based on an underlying {@link BitSet}.
- */
-public class FinishedTriggersBitSet implements FinishedTriggers {
-
- private final BitSet bitSet;
-
- private FinishedTriggersBitSet(BitSet bitSet) {
- this.bitSet = bitSet;
- }
-
- public static FinishedTriggersBitSet emptyWithCapacity(int capacity) {
- return new FinishedTriggersBitSet(new BitSet(capacity));
- }
-
- public static FinishedTriggersBitSet fromBitSet(BitSet bitSet) {
- return new FinishedTriggersBitSet(bitSet);
- }
-
- /**
- * Returns the underlying {@link BitSet} for this {@link FinishedTriggersBitSet}.
- */
- public BitSet getBitSet() {
- return bitSet;
- }
-
- @Override
- public boolean isFinished(ExecutableTrigger trigger) {
- return bitSet.get(trigger.getTriggerIndex());
- }
-
- @Override
- public void setFinished(ExecutableTrigger trigger, boolean value) {
- bitSet.set(trigger.getTriggerIndex(), value);
- }
-
- @Override
- public void clearRecursively(ExecutableTrigger trigger) {
- bitSet.clear(trigger.getTriggerIndex(), trigger.getFirstIndexAfterSubtree());
- }
-
- @Override
- public FinishedTriggersBitSet copy() {
- return new FinishedTriggersBitSet((BitSet) bitSet.clone());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersSet.java
deleted file mode 100644
index a9feb73..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersSet.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.common.collect.Sets;
-import java.util.Set;
-
-/**
- * An implementation of {@link FinishedTriggers} atop a user-provided mutable {@link Set}.
- */
-public class FinishedTriggersSet implements FinishedTriggers {
-
- private final Set<ExecutableTrigger> finishedTriggers;
-
- private FinishedTriggersSet(Set<ExecutableTrigger> finishedTriggers) {
- this.finishedTriggers = finishedTriggers;
- }
-
- public static FinishedTriggersSet fromSet(Set<ExecutableTrigger> finishedTriggers) {
- return new FinishedTriggersSet(finishedTriggers);
- }
-
- /**
- * Returns a mutable {@link Set} of the underlying triggers that are finished.
- */
- public Set<ExecutableTrigger> getFinishedTriggers() {
- return finishedTriggers;
- }
-
- @Override
- public boolean isFinished(ExecutableTrigger trigger) {
- return finishedTriggers.contains(trigger);
- }
-
- @Override
- public void setFinished(ExecutableTrigger trigger, boolean value) {
- if (value) {
- finishedTriggers.add(trigger);
- } else {
- finishedTriggers.remove(trigger);
- }
- }
-
- @Override
- public void clearRecursively(ExecutableTrigger trigger) {
- finishedTriggers.remove(trigger);
- for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
- clearRecursively(subTrigger);
- }
- }
-
- @Override
- public FinishedTriggersSet copy() {
- return fromSet(Sets.newHashSet(finishedTriggers));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java
index 437f14a..8dd648a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java
@@ -35,12 +35,6 @@ public class ReshuffleTrigger<W extends BoundedWindow> extends Trigger {
}
@Override
- public void onElement(Trigger.OnElementContext c) { }
-
- @Override
- public void onMerge(Trigger.OnMergeContext c) { }
-
- @Override
protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
return this;
}
@@ -52,14 +46,6 @@ public class ReshuffleTrigger<W extends BoundedWindow> extends Trigger {
}
@Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- return true;
- }
-
- @Override
- public void onFire(Trigger.TriggerContext context) throws Exception { }
-
- @Override
public String toString() {
return "ReshuffleTrigger()";
}