You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/13 22:22:50 UTC
[16/17] incubator-beam git commit: Restore prior trigger files,
for temporary compatibility
Restore prior trigger files, for temporary compatibility
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a64acb2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a64acb2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a64acb2f
Branch: refs/heads/master
Commit: a64acb2f84ac26bd1a3f297085477f13b0252570
Parents: 69b1efd
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Oct 11 21:35:02 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 13 14:34:35 2016 -0700
----------------------------------------------------------------------
.../apache/beam/runners/core/TriggerRunner.java | 247 +++++++++
.../beam/sdk/transforms/windowing/AfterAll.java | 122 +++++
.../windowing/AfterDelayFromFirstElement.java | 335 ++++++++++++
.../sdk/transforms/windowing/AfterEach.java | 141 +++++
.../sdk/transforms/windowing/AfterFirst.java | 124 +++++
.../sdk/transforms/windowing/AfterPane.java | 144 +++++
.../windowing/AfterProcessingTime.java | 102 ++++
.../AfterSynchronizedProcessingTime.java | 73 +++
.../transforms/windowing/AfterWatermark.java | 355 +++++++++++++
.../transforms/windowing/DefaultTrigger.java | 92 ++++
.../beam/sdk/transforms/windowing/Never.java | 75 +++
.../transforms/windowing/OrFinallyTrigger.java | 105 ++++
.../sdk/transforms/windowing/Repeatedly.java | 101 ++++
.../beam/sdk/transforms/windowing/Trigger.java | 527 +++++++++++++++++++
.../apache/beam/sdk/util/ExecutableTrigger.java | 159 ++++++
.../apache/beam/sdk/util/FinishedTriggers.java | 44 ++
.../beam/sdk/util/FinishedTriggersBitSet.java | 67 +++
.../beam/sdk/util/FinishedTriggersSet.java | 72 +++
.../apache/beam/sdk/util/ReshuffleTrigger.java | 66 +++
.../beam/sdk/util/TriggerContextFactory.java | 507 ++++++++++++++++++
.../sdk/transforms/windowing/AfterAllTest.java | 156 ++++++
.../sdk/transforms/windowing/AfterEachTest.java | 132 +++++
.../transforms/windowing/AfterFirstTest.java | 181 +++++++
.../sdk/transforms/windowing/AfterPaneTest.java | 132 +++++
.../windowing/AfterProcessingTimeTest.java | 187 +++++++
.../AfterSynchronizedProcessingTimeTest.java | 121 +++++
.../windowing/AfterWatermarkTest.java | 380 +++++++++++++
.../windowing/DefaultTriggerTest.java | 176 +++++++
.../sdk/transforms/windowing/NeverTest.java | 56 ++
.../windowing/OrFinallyTriggerTest.java | 215 ++++++++
.../transforms/windowing/RepeatedlyTest.java | 224 ++++++++
.../sdk/transforms/windowing/StubTrigger.java | 70 +++
.../sdk/transforms/windowing/TriggerTest.java | 118 +++++
.../org/apache/beam/sdk/util/TriggerTester.java | 410 +++++++++++++++
34 files changed, 6016 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java
new file mode 100644
index 0000000..8d0f322
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.util.BitSetCoder;
+import org.apache.beam.sdk.util.ExecutableTrigger;
+import org.apache.beam.sdk.util.FinishedTriggers;
+import org.apache.beam.sdk.util.FinishedTriggersBitSet;
+import org.apache.beam.sdk.util.Timers;
+import org.apache.beam.sdk.util.TriggerContextFactory;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.joda.time.Instant;
+
+/**
+ * Executes a trigger while managing persistence of information about which subtriggers are
+ * finished. Subtriggers include all recursive trigger expressions as well as the entire trigger.
+ *
+ * <p>Specifically, the responsibilities are:
+ *
+ * <ul>
+ * <li>Invoking the trigger's methods via its {@link ExecutableTrigger} wrapper by
+ * constructing the appropriate trigger contexts.</li>
+ * <li>Committing a record of which subtriggers are finished to persistent state.</li>
+ * <li>Restoring the record of which subtriggers are finished from persistent state.</li>
+ * <li>Clearing out the persisted finished set when a caller indicates
+ * (via {#link #clearFinished}) that it is no longer needed.</li>
+ * </ul>
+ *
+ * <p>These responsibilities are intertwined: trigger contexts include mutable information about
+ * which subtriggers are finished. This class provides the information when building the contexts
+ * and commits the information when the method of the {@link ExecutableTrigger} returns.
+ *
+ * @param <W> The kind of windows being processed.
+ */
+public class TriggerRunner<W extends BoundedWindow> {
+ @VisibleForTesting
+ static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG =
+ StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of()));
+
+ private final ExecutableTrigger rootTrigger;
+ private final TriggerContextFactory<W> contextFactory;
+
+ public TriggerRunner(ExecutableTrigger rootTrigger, TriggerContextFactory<W> contextFactory) {
+ checkState(rootTrigger.getTriggerIndex() == 0);
+ this.rootTrigger = rootTrigger;
+ this.contextFactory = contextFactory;
+ }
+
+ private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) {
+ if (!isFinishedSetNeeded()) {
+ // If no trigger in the tree will ever have finished bits, then we don't need to read them.
+ // So that the code can be agnostic to that fact, we create a BitSet that is all 0 (not
+ // finished) for each trigger in the tree.
+ return FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree());
+ }
+
+ BitSet bitSet = state.read();
+ return bitSet == null
+ ? FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree())
+ : FinishedTriggersBitSet.fromBitSet(bitSet);
+ }
+
+
+ private void clearFinishedBits(ValueState<BitSet> state) {
+ if (!isFinishedSetNeeded()) {
+ // Nothing to clear.
+ return;
+ }
+ state.clear();
+ }
+
+ /** Return true if the trigger is closed in the window corresponding to the specified state. */
+ public boolean isClosed(StateAccessor<?> state) {
+ return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
+ }
+
+ @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
+ justification = "prefetch side effect")
+ public void prefetchForValue(W window, StateAccessor<?> state) {
+ if (isFinishedSetNeeded()) {
+ state.access(FINISHED_BITS_TAG).readLater();
+ }
+ rootTrigger.getSpec().prefetchOnElement(
+ contextFactory.createStateAccessor(window, rootTrigger));
+ }
+
+ @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
+ justification = "prefetch side effect")
+ public void prefetchOnFire(W window, StateAccessor<?> state) {
+ if (isFinishedSetNeeded()) {
+ state.access(FINISHED_BITS_TAG).readLater();
+ }
+ rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger));
+ }
+
+ @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
+ justification = "prefetch side effect")
+ public void prefetchShouldFire(W window, StateAccessor<?> state) {
+ if (isFinishedSetNeeded()) {
+ state.access(FINISHED_BITS_TAG).readLater();
+ }
+ rootTrigger.getSpec().prefetchShouldFire(
+ contextFactory.createStateAccessor(window, rootTrigger));
+ }
+
+ /**
+ * Run the trigger logic to deal with a new value.
+ */
+ public void processValue(W window, Instant timestamp, Timers timers, StateAccessor<?> state)
+ throws Exception {
+ // Clone so that we can detect changes and so that changes here don't pollute merging.
+ FinishedTriggersBitSet finishedSet =
+ readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
+ Trigger.OnElementContext triggerContext = contextFactory.createOnElementContext(
+ window, timers, timestamp, rootTrigger, finishedSet);
+ rootTrigger.invokeOnElement(triggerContext);
+ persistFinishedSet(state, finishedSet);
+ }
+
+ @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
+ justification = "prefetch side effect")
+ public void prefetchForMerge(
+ W window, Collection<W> mergingWindows, MergingStateAccessor<?, W> state) {
+ if (isFinishedSetNeeded()) {
+ for (ValueState<?> value : state.accessInEachMergingWindow(FINISHED_BITS_TAG).values()) {
+ value.readLater();
+ }
+ }
+ rootTrigger.getSpec().prefetchOnMerge(contextFactory.createMergingStateAccessor(
+ window, mergingWindows, rootTrigger));
+ }
+
+ /**
+ * Run the trigger merging logic as part of executing the specified merge.
+ */
+ public void onMerge(W window, Timers timers, MergingStateAccessor<?, W> state) throws Exception {
+ // Clone so that we can detect changes and so that changes here don't pollute merging.
+ FinishedTriggersBitSet finishedSet =
+ readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
+
+ // And read the finished bits in each merging window.
+ ImmutableMap.Builder<W, FinishedTriggers> builder = ImmutableMap.builder();
+ for (Map.Entry<W, ValueState<BitSet>> entry :
+ state.accessInEachMergingWindow(FINISHED_BITS_TAG).entrySet()) {
+ // Don't need to clone these, since the trigger context doesn't allow modification
+ builder.put(entry.getKey(), readFinishedBits(entry.getValue()));
+ // Clear the underlying finished bits.
+ clearFinishedBits(entry.getValue());
+ }
+ ImmutableMap<W, FinishedTriggers> mergingFinishedSets = builder.build();
+
+ Trigger.OnMergeContext mergeContext = contextFactory.createOnMergeContext(
+ window, timers, rootTrigger, finishedSet, mergingFinishedSets);
+
+ // Run the merge from the trigger
+ rootTrigger.invokeOnMerge(mergeContext);
+
+ persistFinishedSet(state, finishedSet);
+ }
+
+ public boolean shouldFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
+ FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
+ Trigger.TriggerContext context = contextFactory.base(window, timers,
+ rootTrigger, finishedSet);
+ return rootTrigger.invokeShouldFire(context);
+ }
+
+ public void onFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
+ // shouldFire should be false.
+ // However it is too expensive to assert.
+ FinishedTriggersBitSet finishedSet =
+ readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
+ Trigger.TriggerContext context = contextFactory.base(window, timers,
+ rootTrigger, finishedSet);
+ rootTrigger.invokeOnFire(context);
+ persistFinishedSet(state, finishedSet);
+ }
+
+ private void persistFinishedSet(
+ StateAccessor<?> state, FinishedTriggersBitSet modifiedFinishedSet) {
+ if (!isFinishedSetNeeded()) {
+ return;
+ }
+
+ ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG);
+ if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
+ if (modifiedFinishedSet.getBitSet().isEmpty()) {
+ finishedSetState.clear();
+ } else {
+ finishedSetState.write(modifiedFinishedSet.getBitSet());
+ }
+ }
+ }
+
+ /**
+ * Clear the finished bits.
+ */
+ public void clearFinished(StateAccessor<?> state) {
+ clearFinishedBits(state.access(FINISHED_BITS_TAG));
+ }
+
+ /**
+ * Clear the state used for executing triggers, but leave the finished set to indicate
+ * the window is closed.
+ */
+ public void clearState(W window, Timers timers, StateAccessor<?> state) throws Exception {
+ // Don't need to clone, because we'll be clearing the finished bits anyways.
+ FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG));
+ rootTrigger.invokeClear(contextFactory.base(window, timers, rootTrigger, finishedSet));
+ }
+
+ private boolean isFinishedSetNeeded() {
+ // TODO: If we know that no trigger in the tree will ever finish, we don't need to do the
+ // lookup. Right now, we special case this for the DefaultTrigger.
+ return !(rootTrigger.getSpec() instanceof DefaultTrigger);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
new file mode 100644
index 0000000..cc8c97f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.base.Joiner;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.ExecutableTrigger;
+import org.joda.time.Instant;
+
+/**
+ * Create a {@link Trigger} that fires and finishes once after all of its sub-triggers have fired.
+ */
+@Experimental(Experimental.Kind.TRIGGER)
+public class AfterAll extends OnceTrigger {
+
+ private AfterAll(List<Trigger> subTriggers) {
+ super(subTriggers);
+ checkArgument(subTriggers.size() > 1);
+ }
+
+ /**
+ * Returns an {@code AfterAll} {@code Trigger} with the given subtriggers.
+ */
+ public static OnceTrigger of(OnceTrigger... triggers) {
+ return new AfterAll(Arrays.<Trigger>asList(triggers));
+ }
+
+ @Override
+ public void onElement(OnElementContext c) throws Exception {
+ for (ExecutableTrigger subTrigger : c.trigger().unfinishedSubTriggers()) {
+ // Since subTriggers are all OnceTriggers, they must either CONTINUE or FIRE_AND_FINISH.
+ // invokeElement will automatically mark the finish bit if they return FIRE_AND_FINISH.
+ subTrigger.invokeOnElement(c);
+ }
+ }
+
+ @Override
+ public void onMerge(OnMergeContext c) throws Exception {
+ for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
+ subTrigger.invokeOnMerge(c);
+ }
+ boolean allFinished = true;
+ for (ExecutableTrigger subTrigger1 : c.trigger().subTriggers()) {
+ allFinished &= c.forTrigger(subTrigger1).trigger().isFinished();
+ }
+ c.trigger().setFinished(allFinished);
+ }
+
+ @Override
+ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+ // This trigger will fire after the latest of its sub-triggers.
+ Instant deadline = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ for (Trigger subTrigger : subTriggers) {
+ Instant subDeadline = subTrigger.getWatermarkThatGuaranteesFiring(window);
+ if (deadline.isBefore(subDeadline)) {
+ deadline = subDeadline;
+ }
+ }
+ return deadline;
+ }
+
+ @Override
+ public OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+ return new AfterAll(continuationTriggers);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return {@code true} if all subtriggers return {@code true}.
+ */
+ @Override
+ public boolean shouldFire(TriggerContext context) throws Exception {
+ for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
+ if (!context.forTrigger(subtrigger).trigger().isFinished()
+ && !subtrigger.invokeShouldFire(context)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Invokes {@link #onFire} for all subtriggers, eliding redundant calls to {@link #shouldFire}
+ * because they all must be ready to fire.
+ */
+ @Override
+ public void onOnlyFiring(TriggerContext context) throws Exception {
+ for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
+ subtrigger.invokeOnFire(context);
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder("AfterAll.of(");
+ Joiner.on(", ").appendTo(builder, subTriggers);
+ builder.append(")");
+
+ return builder.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
new file mode 100644
index 0000000..c4bc946
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import com.google.common.collect.ImmutableList;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.List;
+import java.util.Locale;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Min;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateMerging;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.PeriodFormat;
+import org.joda.time.format.PeriodFormatter;
+
+/**
+ * A base class for triggers that happen after a processing time delay from the arrival
+ * of the first element in a pane.
+ *
+ * <p>This class is for internal use only and may change at any time.
+ */
+@Experimental(Experimental.Kind.TRIGGER)
+public abstract class AfterDelayFromFirstElement extends OnceTrigger {
+
+ protected static final List<SerializableFunction<Instant, Instant>> IDENTITY =
+ ImmutableList.<SerializableFunction<Instant, Instant>>of();
+
+ protected static final StateTag<Object, AccumulatorCombiningState<Instant,
+ Combine.Holder<Instant>, Instant>> DELAYED_UNTIL_TAG =
+ StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
+ "delayed", InstantCoder.of(), Min.MinFn.<Instant>naturalOrder()));
+
+ private static final PeriodFormatter PERIOD_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH);
+
+ /**
+ * To complete an implementation, return the desired time from the TriggerContext.
+ */
+ @Nullable
+ public abstract Instant getCurrentTime(Trigger.TriggerContext context);
+
+ /**
+ * To complete an implementation, return a new instance like this one, but incorporating
+ * the provided timestamp mapping functions. Generally should be used by calling the
+ * constructor of this class from the constructor of the subclass.
+ */
+ protected abstract AfterDelayFromFirstElement newWith(
+ List<SerializableFunction<Instant, Instant>> transform);
+
+ /**
+ * A list of timestampMappers m1, m2, m3, ... m_n considered to be composed in sequence. The
+ * overall mapping for an instance `instance` is `m_n(... m3(m2(m1(instant))`,
+ * implemented via #computeTargetTimestamp
+ */
+ protected final List<SerializableFunction<Instant, Instant>> timestampMappers;
+
+ private final TimeDomain timeDomain;
+
+ public AfterDelayFromFirstElement(
+ TimeDomain timeDomain,
+ List<SerializableFunction<Instant, Instant>> timestampMappers) {
+ super(null);
+ this.timestampMappers = timestampMappers;
+ this.timeDomain = timeDomain;
+ }
+
+ private Instant getTargetTimestamp(OnElementContext c) {
+ return computeTargetTimestamp(c.currentProcessingTime());
+ }
+
+ /**
+ * Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater
+ * than the timestamp.
+ *
+ * <p>TODO: Consider sharing this with FixedWindows, and bring over the equivalent of
+ * CalendarWindows.
+ */
+ public AfterDelayFromFirstElement alignedTo(final Duration size, final Instant offset) {
+ return newWith(new AlignFn(size, offset));
+ }
+
+ /**
+ * Aligns the time to be the smallest multiple of {@code size} greater than the timestamp
+ * since the epoch.
+ */
+ public AfterDelayFromFirstElement alignedTo(final Duration size) {
+ return alignedTo(size, new Instant(0));
+ }
+
+ /**
+ * Adds some delay to the original target time.
+ *
+ * @param delay the delay to add
+ * @return An updated time trigger that will wait the additional time before firing.
+ */
+ public AfterDelayFromFirstElement plusDelayOf(final Duration delay) {
+ return newWith(new DelayFn(delay));
+ }
+
+ /**
+ * @deprecated This will be removed in the next major version. Please use only
+ * {@link #plusDelayOf} and {@link #alignedTo}.
+ */
+ @Deprecated
+ public OnceTrigger mappedTo(SerializableFunction<Instant, Instant> timestampMapper) {
+ return newWith(timestampMapper);
+ }
+
+ @Override
+ public boolean isCompatible(Trigger other) {
+ if (!getClass().equals(other.getClass())) {
+ return false;
+ }
+
+ AfterDelayFromFirstElement that = (AfterDelayFromFirstElement) other;
+ return this.timestampMappers.equals(that.timestampMappers);
+ }
+
+
+ private AfterDelayFromFirstElement newWith(
+ SerializableFunction<Instant, Instant> timestampMapper) {
+ return newWith(
+ ImmutableList.<SerializableFunction<Instant, Instant>>builder()
+ .addAll(timestampMappers)
+ .add(timestampMapper)
+ .build());
+ }
+
+ @Override
+ @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification =
+ "prefetch side effect")
+ public void prefetchOnElement(StateAccessor<?> state) {
+ state.access(DELAYED_UNTIL_TAG).readLater();
+ }
+
+ @Override
+ public void onElement(OnElementContext c) throws Exception {
+ CombiningState<Instant, Instant> delayUntilState = c.state().access(DELAYED_UNTIL_TAG);
+ Instant oldDelayUntil = delayUntilState.read();
+
+ // Since processing time can only advance, resulting in target wake-up times we would
+ // ignore anyhow, we don't bother with it if it is already set.
+ if (oldDelayUntil != null) {
+ return;
+ }
+
+ Instant targetTimestamp = getTargetTimestamp(c);
+ delayUntilState.add(targetTimestamp);
+ c.setTimer(targetTimestamp, timeDomain);
+ }
+
+ @Override
+ public void prefetchOnMerge(MergingStateAccessor<?, ?> state) {
+ super.prefetchOnMerge(state);
+ StateMerging.prefetchCombiningValues(state, DELAYED_UNTIL_TAG);
+ }
+
+ @Override
+ public void onMerge(OnMergeContext c) throws Exception {
+ // NOTE: We could try to delete all timers which are still active, but we would
+ // need access to a timer context for each merging window.
+ // for (CombiningValueStateInternal<Instant, Combine.Holder<Instant>, Instant> state :
+ // c.state().accessInEachMergingWindow(DELAYED_UNTIL_TAG).values()) {
+ // Instant timestamp = state.get().read();
+ // if (timestamp != null) {
+ // <context for merging window>.deleteTimer(timestamp, timeDomain);
+ // }
+ // }
+ // Instead let them fire and be ignored.
+
+ // If the trigger is already finished, there is no way it will become re-activated
+ if (c.trigger().isFinished()) {
+ StateMerging.clear(c.state(), DELAYED_UNTIL_TAG);
+ // NOTE: We do not attempt to delete the timers.
+ return;
+ }
+
+ // Determine the earliest point across all the windows, and delay to that.
+ StateMerging.mergeCombiningValues(c.state(), DELAYED_UNTIL_TAG);
+
+ Instant earliestTargetTime = c.state().access(DELAYED_UNTIL_TAG).read();
+ if (earliestTargetTime != null) {
+ c.setTimer(earliestTargetTime, timeDomain);
+ }
+ }
+
+ @Override
+ @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification =
+ "prefetch side effect")
+ public void prefetchShouldFire(StateAccessor<?> state) {
+ state.access(DELAYED_UNTIL_TAG).readLater();
+ }
+
+ @Override
+ public void clear(TriggerContext c) throws Exception {
+ c.state().access(DELAYED_UNTIL_TAG).clear();
+ }
+
+ @Override
+ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+ return BoundedWindow.TIMESTAMP_MAX_VALUE;
+ }
+
+ @Override
+ public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
+ Instant delayedUntil = context.state().access(DELAYED_UNTIL_TAG).read();
+ return delayedUntil != null
+ && getCurrentTime(context) != null
+ && getCurrentTime(context).isAfter(delayedUntil);
+ }
+
+ @Override
+ protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception {
+ clear(context);
+ }
+
+ protected Instant computeTargetTimestamp(Instant time) {
+ Instant result = time;
+ for (SerializableFunction<Instant, Instant> timestampMapper : timestampMappers) {
+ result = timestampMapper.apply(result);
+ }
+ return result;
+ }
+
+ /**
+ * A {@link SerializableFunction} to delay the timestamp at which this triggers fires.
+ */
+ private static final class DelayFn implements SerializableFunction<Instant, Instant> {
+ private final Duration delay;
+
+ public DelayFn(Duration delay) {
+ this.delay = delay;
+ }
+
+ @Override
+ public Instant apply(Instant input) {
+ return input.plus(delay);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (object == this) {
+ return true;
+ }
+
+ if (!(object instanceof DelayFn)) {
+ return false;
+ }
+
+ return this.delay.equals(((DelayFn) object).delay);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(delay);
+ }
+
+ @Override
+ public String toString() {
+ return PERIOD_FORMATTER.print(delay.toPeriod());
+ }
+ }
+
+ /**
+ * A {@link SerializableFunction} to align an instant to the nearest interval boundary.
+ */
+ static final class AlignFn implements SerializableFunction<Instant, Instant> {
+ private final Duration size;
+ private final Instant offset;
+
+
+ /**
+ * Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater
+ * than the timestamp.
+ */
+ public AlignFn(Duration size, Instant offset) {
+ this.size = size;
+ this.offset = offset;
+ }
+
+ @Override
+ public Instant apply(Instant point) {
+ long millisSinceStart = new Duration(offset, point).getMillis() % size.getMillis();
+ return millisSinceStart == 0 ? point : point.plus(size).minus(millisSinceStart);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (object == this) {
+ return true;
+ }
+
+ if (!(object instanceof AlignFn)) {
+ return false;
+ }
+
+ AlignFn other = (AlignFn) object;
+ return other.size.equals(this.size)
+ && other.offset.equals(this.offset);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(size, offset);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
new file mode 100644
index 0000000..629c640
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.base.Joiner;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.util.ExecutableTrigger;
+import org.joda.time.Instant;
+
+/**
+ * A composite {@link Trigger} that executes its sub-triggers in order.
+ * Only one sub-trigger is executing at a time,
+ * and any time it fires the {@code AfterEach} fires. When the currently executing
+ * sub-trigger finishes, the {@code AfterEach} starts executing the next sub-trigger.
+ *
+ * <p>{@code AfterEach.inOrder(t1, t2, ...)} finishes when all of the sub-triggers have finished.
+ *
+ * <p>The following properties hold:
+ * <ul>
+ * <li> {@code AfterEach.inOrder(AfterEach.inOrder(a, b), c)} behaves the same as
+ * {@code AfterEach.inOrder(a, b, c)} and {@code AfterEach.inOrder(a, AfterEach.inOrder(b, c)}.
+ * <li> {@code AfterEach.inOrder(Repeatedly.forever(a), b)} behaves the same as
+ * {@code Repeatedly.forever(a)}, since the repeated trigger never finishes.
+ * </ul>
+ */
+@Experimental(Experimental.Kind.TRIGGER)
+public class AfterEach extends Trigger {
+
+ private AfterEach(List<Trigger> subTriggers) {
+ super(subTriggers);
+ checkArgument(subTriggers.size() > 1);
+ }
+
+ /**
+ * Returns an {@code AfterEach} {@code Trigger} with the given subtriggers.
+ */
+ @SafeVarargs
+ public static Trigger inOrder(Trigger... triggers) {
+ return new AfterEach(Arrays.<Trigger>asList(triggers));
+ }
+
+ @Override
+ public void onElement(OnElementContext c) throws Exception {
+ if (!c.trigger().isMerging()) {
+ // If merges are not possible, we need only run the first unfinished subtrigger
+ c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c);
+ } else {
+ // If merges are possible, we need to run all subtriggers in parallel
+ for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
+ // Even if the subTrigger is done, it may be revived via merging and must have
+ // adequate state.
+ subTrigger.invokeOnElement(c);
+ }
+ }
+ }
+
+ @Override
+ public void onMerge(OnMergeContext context) throws Exception {
+ // If merging makes a subtrigger no-longer-finished, it will automatically
+ // begin participating in shouldFire and onFire appropriately.
+
+ // All the following triggers are retroactively "not started" but that is
+ // also automatic because they are cleared whenever this trigger
+ // fires.
+ boolean priorTriggersAllFinished = true;
+ for (ExecutableTrigger subTrigger : context.trigger().subTriggers()) {
+ if (priorTriggersAllFinished) {
+ subTrigger.invokeOnMerge(context);
+ priorTriggersAllFinished &= context.forTrigger(subTrigger).trigger().isFinished();
+ } else {
+ subTrigger.invokeClear(context);
+ }
+ }
+ updateFinishedState(context);
+ }
+
+ @Override
+ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+ // This trigger will fire at least once when the first trigger in the sequence
+ // fires at least once.
+ return subTriggers.get(0).getWatermarkThatGuaranteesFiring(window);
+ }
+
+ @Override
+ public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+ return Repeatedly.forever(new AfterFirst(continuationTriggers));
+ }
+
+ @Override
+ public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
+ ExecutableTrigger firstUnfinished = context.trigger().firstUnfinishedSubTrigger();
+ return firstUnfinished.invokeShouldFire(context);
+ }
+
+ @Override
+ public void onFire(Trigger.TriggerContext context) throws Exception {
+ context.trigger().firstUnfinishedSubTrigger().invokeOnFire(context);
+
+ // Reset all subtriggers if in a merging context; any may be revived by merging so they are
+ // all run in parallel for each pending pane.
+ if (context.trigger().isMerging()) {
+ for (ExecutableTrigger subTrigger : context.trigger().subTriggers()) {
+ subTrigger.invokeClear(context);
+ }
+ }
+
+ updateFinishedState(context);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder("AfterEach.inOrder(");
+ Joiner.on(", ").appendTo(builder, subTriggers);
+ builder.append(")");
+
+ return builder.toString();
+ }
+
+ private void updateFinishedState(TriggerContext context) {
+ context.trigger().setFinished(context.trigger().firstUnfinishedSubTrigger() == null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
new file mode 100644
index 0000000..6b06cfa
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.base.Joiner;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.ExecutableTrigger;
+import org.joda.time.Instant;
+
+/**
+ * Create a composite {@link Trigger} that fires once after at least one of its sub-triggers have
+ * fired.
+ */
+@Experimental(Experimental.Kind.TRIGGER)
+public class AfterFirst extends OnceTrigger {
+
+ AfterFirst(List<Trigger> subTriggers) {
+ super(subTriggers);
+ checkArgument(subTriggers.size() > 1);
+ }
+
+ /**
+ * Returns an {@code AfterFirst} {@code Trigger} with the given subtriggers.
+ */
+ public static OnceTrigger of(OnceTrigger... triggers) {
+ return new AfterFirst(Arrays.<Trigger>asList(triggers));
+ }
+
+ @Override
+ public void onElement(OnElementContext c) throws Exception {
+ for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
+ subTrigger.invokeOnElement(c);
+ }
+ }
+
+ @Override
+ public void onMerge(OnMergeContext c) throws Exception {
+ for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
+ subTrigger.invokeOnMerge(c);
+ }
+ updateFinishedStatus(c);
+ }
+
+ @Override
+ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+ // This trigger will fire after the earliest of its sub-triggers.
+ Instant deadline = BoundedWindow.TIMESTAMP_MAX_VALUE;
+ for (Trigger subTrigger : subTriggers) {
+ Instant subDeadline = subTrigger.getWatermarkThatGuaranteesFiring(window);
+ if (deadline.isAfter(subDeadline)) {
+ deadline = subDeadline;
+ }
+ }
+ return deadline;
+ }
+
+ @Override
+ public OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+ return new AfterFirst(continuationTriggers);
+ }
+
+ @Override
+ public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
+ for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
+ if (context.forTrigger(subtrigger).trigger().isFinished()
+ || subtrigger.invokeShouldFire(context)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ protected void onOnlyFiring(TriggerContext context) throws Exception {
+ for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
+ TriggerContext subContext = context.forTrigger(subtrigger);
+ if (subtrigger.invokeShouldFire(subContext)) {
+ // If the trigger is ready to fire, then do whatever it needs to do.
+ subtrigger.invokeOnFire(subContext);
+ } else {
+ // If the trigger is not ready to fire, it is nonetheless true that whatever
+ // pending pane it was tracking is now gone.
+ subtrigger.invokeClear(subContext);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder("AfterFirst.of(");
+ Joiner.on(", ").appendTo(builder, subTriggers);
+ builder.append(")");
+
+ return builder.toString();
+ }
+
+ private void updateFinishedStatus(TriggerContext c) {
+ boolean anyFinished = false;
+ for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
+ anyFinished |= c.forTrigger(subTrigger).trigger().isFinished();
+ }
+ c.trigger().setFinished(anyFinished);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
new file mode 100644
index 0000000..8c128dd
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateMerging;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.joda.time.Instant;
+
+/**
+ * {@link Trigger}s that fire based on properties of the elements in the current pane.
+ */
+@Experimental(Experimental.Kind.TRIGGER)
+public class AfterPane extends OnceTrigger {
+
+private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
+ ELEMENTS_IN_PANE_TAG =
+ StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
+ "count", VarLongCoder.of(), new Sum.SumLongFn()));
+
+ private final int countElems;
+
+ private AfterPane(int countElems) {
+ super(null);
+ this.countElems = countElems;
+ }
+
+ /**
+ * Creates a trigger that fires when the pane contains at least {@code countElems} elements.
+ */
+ public static AfterPane elementCountAtLeast(int countElems) {
+ return new AfterPane(countElems);
+ }
+
+ @Override
+ public void onElement(OnElementContext c) throws Exception {
+ c.state().access(ELEMENTS_IN_PANE_TAG).add(1L);
+ }
+
+ @Override
+ public void prefetchOnMerge(MergingStateAccessor<?, ?> state) {
+ super.prefetchOnMerge(state);
+ StateMerging.prefetchCombiningValues(state, ELEMENTS_IN_PANE_TAG);
+ }
+
+ @Override
+ public void onMerge(OnMergeContext context) throws Exception {
+ // If we've already received enough elements and finished in some window,
+ // then this trigger is just finished.
+ if (context.trigger().finishedInAnyMergingWindow()) {
+ context.trigger().setFinished(true);
+ StateMerging.clear(context.state(), ELEMENTS_IN_PANE_TAG);
+ return;
+ }
+
+ // Otherwise, compute the sum of elements in all the active panes.
+ StateMerging.mergeCombiningValues(context.state(), ELEMENTS_IN_PANE_TAG);
+ }
+
+ @Override
+ @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification =
+ "prefetch side effect")
+ public void prefetchShouldFire(StateAccessor<?> state) {
+ state.access(ELEMENTS_IN_PANE_TAG).readLater();
+ }
+
+ @Override
+ public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
+ long count = context.state().access(ELEMENTS_IN_PANE_TAG).read();
+ return count >= countElems;
+ }
+
+ @Override
+ public void clear(TriggerContext c) throws Exception {
+ c.state().access(ELEMENTS_IN_PANE_TAG).clear();
+ }
+
+ @Override
+ public boolean isCompatible(Trigger other) {
+ return this.equals(other);
+ }
+
+ @Override
+ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+ return BoundedWindow.TIMESTAMP_MAX_VALUE;
+ }
+
+ @Override
+ public OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+ return AfterPane.elementCountAtLeast(1);
+ }
+
+ @Override
+ public String toString() {
+ return "AfterPane.elementCountAtLeast(" + countElems + ")";
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof AfterPane)) {
+ return false;
+ }
+ AfterPane that = (AfterPane) obj;
+ return this.countElems == that.countElems;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(countElems);
+ }
+
+ @Override
+ protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception {
+ clear(context);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
new file mode 100644
index 0000000..f551118
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Instant;
+
+/**
+ * {@code AfterProcessingTime} triggers fire based on the current processing time. They operate in
+ * the real-time domain.
+ *
+ * <p>The time at which to fire the timer can be adjusted via the methods in
+ * {@link AfterDelayFromFirstElement}, such as {@link AfterDelayFromFirstElement#plusDelayOf} or
+ * {@link AfterDelayFromFirstElement#alignedTo}.
+ */
+@Experimental(Experimental.Kind.TRIGGER)
+public class AfterProcessingTime extends AfterDelayFromFirstElement {
+
+ @Override
+ @Nullable
+ public Instant getCurrentTime(Trigger.TriggerContext context) {
+ return context.currentProcessingTime();
+ }
+
+ private AfterProcessingTime(List<SerializableFunction<Instant, Instant>> transforms) {
+ super(TimeDomain.PROCESSING_TIME, transforms);
+ }
+
+ /**
+ * Creates a trigger that fires when the current processing time passes the processing time
+ * at which this trigger saw the first element in a pane.
+ */
+ public static AfterProcessingTime pastFirstElementInPane() {
+ return new AfterProcessingTime(IDENTITY);
+ }
+
+ @Override
+ protected AfterProcessingTime newWith(
+ List<SerializableFunction<Instant, Instant>> transforms) {
+ return new AfterProcessingTime(transforms);
+ }
+
+ @Override
+ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+ return BoundedWindow.TIMESTAMP_MAX_VALUE;
+ }
+
+ @Override
+ protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+ return new AfterSynchronizedProcessingTime();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder("AfterProcessingTime.pastFirstElementInPane()");
+ for (SerializableFunction<Instant, Instant> delayFn : timestampMappers) {
+ builder
+ .append(".plusDelayOf(")
+ .append(delayFn)
+ .append(")");
+ }
+
+ return builder.toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof AfterProcessingTime)) {
+ return false;
+ }
+ AfterProcessingTime that = (AfterProcessingTime) obj;
+ return Objects.equals(this.timestampMappers, that.timestampMappers);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getClass(), this.timestampMappers);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
new file mode 100644
index 0000000..59ece10
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import com.google.common.base.Objects;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Instant;
+
+class AfterSynchronizedProcessingTime extends AfterDelayFromFirstElement {
+
+ @Override
+ @Nullable
+ public Instant getCurrentTime(Trigger.TriggerContext context) {
+ return context.currentSynchronizedProcessingTime();
+ }
+
+ public AfterSynchronizedProcessingTime() {
+ super(TimeDomain.SYNCHRONIZED_PROCESSING_TIME,
+ Collections.<SerializableFunction<Instant, Instant>>emptyList());
+ }
+
+ @Override
+ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+ return BoundedWindow.TIMESTAMP_MAX_VALUE;
+ }
+
+ @Override
+ protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "AfterSynchronizedProcessingTime.pastFirstElementInPane()";
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return this == obj || obj instanceof AfterSynchronizedProcessingTime;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(AfterSynchronizedProcessingTime.class);
+ }
+
+ @Override
+ protected AfterSynchronizedProcessingTime
+ newWith(List<SerializableFunction<Instant, Instant>> transforms) {
+ // ignore transforms
+ return this;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
new file mode 100644
index 0000000..e2463d8
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.ExecutableTrigger;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Instant;
+
+/**
+ * {@code AfterWatermark} triggers fire based on progress of the system watermark. This time is a
+ * lower-bound, sometimes heuristically established, on event times that have been fully processed
+ * by the pipeline.
+ *
+ * <p>For sources that provide non-heuristic watermarks (e.g.
+ * {@link org.apache.beam.sdk.io.PubsubIO} when using arrival times as event times), the
+ * watermark is a strict guarantee that no data with an event time earlier than
+ * that watermark will ever be observed in the pipeline. In this case, it's safe to assume that any
+ * pane triggered by an {@code AfterWatermark} trigger with a reference point at or beyond the end
+ * of the window will be the last pane ever for that window.
+ *
+ * <p>For sources that provide heuristic watermarks (e.g.
+ * {@link org.apache.beam.sdk.io.PubsubIO} when using user-supplied event times), the
+ * watermark itself becomes an <i>estimate</i> that no data with an event time earlier than that
+ * watermark (i.e. "late data") will ever be observed in the pipeline. These heuristics can
+ * often be quite accurate, but the chance of seeing late data for any given window is non-zero.
+ * Thus, if absolute correctness over time is important to your use case, you may want to consider
+ * using a trigger that accounts for late data. The default trigger,
+ * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}, which fires
+ * once when the watermark passes the end of the window and then immediately therafter when any
+ * late data arrives, is one such example.
+ *
+ * <p>The watermark is the clock that defines {@link TimeDomain#EVENT_TIME}.
+ *
+ * <p>Additionaly firings before or after the watermark can be requested by calling
+ * {@code AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)} or
+ * {@code AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)}.
+ */
+@Experimental(Experimental.Kind.TRIGGER)
+public class AfterWatermark {
+
+ private static final String TO_STRING = "AfterWatermark.pastEndOfWindow()";
+
+ // Static factory class.
+ private AfterWatermark() {}
+
+ /**
+ * Creates a trigger that fires when the watermark passes the end of the window.
+ */
+ public static FromEndOfWindow pastEndOfWindow() {
+ return new FromEndOfWindow();
+ }
+
+ /**
+ * @see AfterWatermark
+ */
+ public static class AfterWatermarkEarlyAndLate extends Trigger {
+
+ private static final int EARLY_INDEX = 0;
+ private static final int LATE_INDEX = 1;
+
+ private final OnceTrigger earlyTrigger;
+ @Nullable
+ private final OnceTrigger lateTrigger;
+
+ @SuppressWarnings("unchecked")
+ public AfterWatermarkEarlyAndLate(OnceTrigger earlyTrigger, OnceTrigger lateTrigger) {
+ super(lateTrigger == null
+ ? ImmutableList.<Trigger>of(earlyTrigger)
+ : ImmutableList.<Trigger>of(earlyTrigger, lateTrigger));
+ this.earlyTrigger = checkNotNull(earlyTrigger, "earlyTrigger should not be null");
+ this.lateTrigger = lateTrigger;
+ }
+
+ public Trigger withEarlyFirings(OnceTrigger earlyTrigger) {
+ return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
+ }
+
+ public Trigger withLateFirings(OnceTrigger lateTrigger) {
+ return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
+ }
+
+ @Override
+ public void onElement(OnElementContext c) throws Exception {
+ if (!c.trigger().isMerging()) {
+ // If merges can never happen, we just run the unfinished subtrigger
+ c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c);
+ } else {
+ // If merges can happen, we run for all subtriggers because they might be
+ // de-activated or re-activated
+ for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
+ subTrigger.invokeOnElement(c);
+ }
+ }
+ }
+
+ @Override
+ public void onMerge(OnMergeContext c) throws Exception {
+ // NOTE that the ReduceFnRunner will delete all end-of-window timers for the
+ // merged-away windows.
+
+ ExecutableTrigger earlySubtrigger = c.trigger().subTrigger(EARLY_INDEX);
+ // We check the early trigger to determine if we are still processing it or
+ // if the end of window has transitioned us to the late trigger
+ OnMergeContext earlyContext = c.forTrigger(earlySubtrigger);
+
+ // If the early trigger is still active in any merging window then it is still active in
+ // the new merged window, because even if the merged window is "done" some pending elements
+ // haven't had a chance to fire.
+ if (!earlyContext.trigger().finishedInAllMergingWindows() || !endOfWindowReached(c)) {
+ earlyContext.trigger().setFinished(false);
+ if (lateTrigger != null) {
+ ExecutableTrigger lateSubtrigger = c.trigger().subTrigger(LATE_INDEX);
+ OnMergeContext lateContext = c.forTrigger(lateSubtrigger);
+ lateContext.trigger().setFinished(false);
+ lateSubtrigger.invokeClear(lateContext);
+ }
+ } else {
+ // Otherwise the early trigger and end-of-window bit is done for good.
+ earlyContext.trigger().setFinished(true);
+ if (lateTrigger != null) {
+ c.trigger().subTrigger(LATE_INDEX).invokeOnMerge(c);
+ }
+ }
+ }
+
+ @Override
+ public Trigger getContinuationTrigger() {
+ return new AfterWatermarkEarlyAndLate(
+ earlyTrigger.getContinuationTrigger(),
+ lateTrigger == null ? null : lateTrigger.getContinuationTrigger());
+ }
+
+ @Override
+ protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+ throw new UnsupportedOperationException(
+ "Should not call getContinuationTrigger(List<Trigger>)");
+ }
+
+ @Override
+ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+ // Even without an early or late trigger, we'll still produce a firing at the watermark.
+ return window.maxTimestamp();
+ }
+
+ private boolean endOfWindowReached(Trigger.TriggerContext context) {
+ return context.currentEventTime() != null
+ && context.currentEventTime().isAfter(context.window().maxTimestamp());
+ }
+
+ @Override
+ public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
+ if (!context.trigger().isFinished(EARLY_INDEX)) {
+ // We have not yet transitioned to late firings.
+ // We should fire if either the trigger is ready or we reach the end of the window.
+ return context.trigger().subTrigger(EARLY_INDEX).invokeShouldFire(context)
+ || endOfWindowReached(context);
+ } else if (lateTrigger == null) {
+ return false;
+ } else {
+ // We are running the late trigger
+ return context.trigger().subTrigger(LATE_INDEX).invokeShouldFire(context);
+ }
+ }
+
+ @Override
+ public void onFire(Trigger.TriggerContext context) throws Exception {
+ if (!context.forTrigger(context.trigger().subTrigger(EARLY_INDEX)).trigger().isFinished()) {
+ onNonLateFiring(context);
+ } else if (lateTrigger != null) {
+ onLateFiring(context);
+ } else {
+ // all done
+ context.trigger().setFinished(true);
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder(TO_STRING);
+
+ if (!(earlyTrigger instanceof Never.NeverTrigger)) {
+ builder
+ .append(".withEarlyFirings(")
+ .append(earlyTrigger)
+ .append(")");
+ }
+
+ if (lateTrigger != null && !(lateTrigger instanceof Never.NeverTrigger)) {
+ builder
+ .append(".withLateFirings(")
+ .append(lateTrigger)
+ .append(")");
+ }
+
+ return builder.toString();
+ }
+
+ private void onNonLateFiring(Trigger.TriggerContext context) throws Exception {
+ // We have not yet transitioned to late firings.
+ ExecutableTrigger earlySubtrigger = context.trigger().subTrigger(EARLY_INDEX);
+ Trigger.TriggerContext earlyContext = context.forTrigger(earlySubtrigger);
+
+ if (!endOfWindowReached(context)) {
+ // This is an early firing, since we have not arrived at the end of the window
+ // Implicitly repeats
+ earlySubtrigger.invokeOnFire(context);
+ earlySubtrigger.invokeClear(context);
+ earlyContext.trigger().setFinished(false);
+ } else {
+ // We have arrived at the end of the window; terminate the early trigger
+ // and clear out the late trigger's state
+ if (earlySubtrigger.invokeShouldFire(context)) {
+ earlySubtrigger.invokeOnFire(context);
+ }
+ earlyContext.trigger().setFinished(true);
+ earlySubtrigger.invokeClear(context);
+
+ if (lateTrigger == null) {
+ // Done if there is no late trigger.
+ context.trigger().setFinished(true);
+ } else {
+ // If there is a late trigger, we transition to it, and need to clear its state
+ // because it was run in parallel.
+ context.trigger().subTrigger(LATE_INDEX).invokeClear(context);
+ }
+ }
+
+ }
+
+ private void onLateFiring(Trigger.TriggerContext context) throws Exception {
+ // We are firing the late trigger, with implicit repeat
+ ExecutableTrigger lateSubtrigger = context.trigger().subTrigger(LATE_INDEX);
+ lateSubtrigger.invokeOnFire(context);
+ // It is a OnceTrigger, so it must have finished; unfinished it and clear it
+ lateSubtrigger.invokeClear(context);
+ context.forTrigger(lateSubtrigger).trigger().setFinished(false);
+ }
+ }
+
+ /**
+ * A watermark trigger targeted relative to the end of the window.
+ */
+ public static class FromEndOfWindow extends OnceTrigger {
+
+ private FromEndOfWindow() {
+ super(null);
+ }
+
+ /**
+ * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever
+ * the given {@code Trigger} fires before the watermark has passed the end of the window.
+ */
+ public AfterWatermarkEarlyAndLate withEarlyFirings(OnceTrigger earlyFirings) {
+ checkNotNull(earlyFirings, "Must specify the trigger to use for early firings");
+ return new AfterWatermarkEarlyAndLate(earlyFirings, null);
+ }
+
+ /**
+ * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever
+ * the given {@code Trigger} fires after the watermark has passed the end of the window.
+ */
+ public AfterWatermarkEarlyAndLate withLateFirings(OnceTrigger lateFirings) {
+ checkNotNull(lateFirings, "Must specify the trigger to use for late firings");
+ return new AfterWatermarkEarlyAndLate(Never.ever(), lateFirings);
+ }
+
+ @Override
+ public void onElement(OnElementContext c) throws Exception {
+ // We're interested in knowing when the input watermark passes the end of the window.
+ // (It is possible this has already happened, in which case the timer will be fired
+ // almost immediately).
+ c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
+ }
+
+ @Override
+ public void onMerge(OnMergeContext c) throws Exception {
+ // NOTE that the ReduceFnRunner will delete all end-of-window timers for the
+ // merged-away windows.
+
+ if (!c.trigger().finishedInAllMergingWindows()) {
+ // If the trigger is still active in any merging window then it is still active in the new
+ // merged window, because even if the merged window is "done" some pending elements haven't
+ // had a chance to fire
+ c.trigger().setFinished(false);
+ } else if (!endOfWindowReached(c)) {
+ // If the end of the new window has not been reached, then the trigger is active again.
+ c.trigger().setFinished(false);
+ } else {
+ // Otherwise it is done for good
+ c.trigger().setFinished(true);
+ }
+ }
+
+ @Override
+ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+ return window.maxTimestamp();
+ }
+
+ @Override
+ public FromEndOfWindow getContinuationTrigger(List<Trigger> continuationTriggers) {
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return TO_STRING;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof FromEndOfWindow;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getClass());
+ }
+
+ @Override
+ public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
+ return endOfWindowReached(context);
+ }
+
+ private boolean endOfWindowReached(Trigger.TriggerContext context) {
+ return context.currentEventTime() != null
+ && context.currentEventTime().isAfter(context.window().maxTimestamp());
+ }
+
+ @Override
+ protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception { }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
new file mode 100644
index 0000000..d6b72ef
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Instant;
+
+/**
+ * A trigger that is equivalent to {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
+ * See {@link Repeatedly#forever} and {@link AfterWatermark#pastEndOfWindow} for more details.
+ */
+@Experimental(Experimental.Kind.TRIGGER)
+public class DefaultTrigger extends Trigger{
+
+ private DefaultTrigger() {
+ super(null);
+ }
+
+ /**
+ * Returns the default trigger.
+ */
+ public static DefaultTrigger of() {
+ return new DefaultTrigger();
+ }
+
+ @Override
+ public void onElement(OnElementContext c) throws Exception {
+ // If the end of the window has already been reached, then we are already ready to fire
+ // and do not need to set a wake-up timer.
+ if (!endOfWindowReached(c)) {
+ c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
+ }
+ }
+
+ @Override
+ public void onMerge(OnMergeContext c) throws Exception {
+ // If the end of the window has already been reached, then we are already ready to fire
+ // and do not need to set a wake-up timer.
+ if (!endOfWindowReached(c)) {
+ c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
+ }
+ }
+
+ @Override
+ public void clear(TriggerContext c) throws Exception { }
+
+ @Override
+ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+ return window.maxTimestamp();
+ }
+
+ @Override
+ public boolean isCompatible(Trigger other) {
+ // Semantically, all default triggers are identical
+ return other instanceof DefaultTrigger;
+ }
+
+ @Override
+ public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+ return this;
+ }
+
+ @Override
+ public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
+ return endOfWindowReached(context);
+ }
+
+ private boolean endOfWindowReached(Trigger.TriggerContext context) {
+ return context.currentEventTime() != null
+ && context.currentEventTime().isAfter(context.window().maxTimestamp());
+ }
+
+ @Override
+ public void onFire(Trigger.TriggerContext context) throws Exception { }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
new file mode 100644
index 0000000..5f20465
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import java.util.List;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.joda.time.Instant;
+
+/**
+ * A trigger which never fires.
+ *
+ * <p>Using this trigger will only produce output when the watermark passes the end of the
+ * {@link BoundedWindow window} plus the {@link Window#withAllowedLateness allowed
+ * lateness}.
+ */
+public final class Never {
+ /**
+ * Returns a trigger which never fires. Output will be produced from the using {@link GroupByKey}
+ * when the {@link BoundedWindow} closes.
+ */
+ public static OnceTrigger ever() {
+ // NeverTrigger ignores all inputs and is Window-type independent.
+ return new NeverTrigger();
+ }
+
+ // package-private in order to check identity for string formatting.
+ static class NeverTrigger extends OnceTrigger {
+ protected NeverTrigger() {
+ super(null);
+ }
+
+ @Override
+ public void onElement(OnElementContext c) {}
+
+ @Override
+ public void onMerge(OnMergeContext c) {}
+
+ @Override
+ protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+ return this;
+ }
+
+ @Override
+ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+ return BoundedWindow.TIMESTAMP_MAX_VALUE;
+ }
+
+ @Override
+ public boolean shouldFire(Trigger.TriggerContext context) {
+ return false;
+ }
+
+ @Override
+ protected void onOnlyFiring(Trigger.TriggerContext context) {
+ throw new UnsupportedOperationException(
+ String.format("%s should never fire", getClass().getSimpleName()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
new file mode 100644
index 0000000..25b7b34
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.util.ExecutableTrigger;
+import org.joda.time.Instant;
+
+/**
+ * Executes the {@code actual} trigger until it finishes or until the {@code until} trigger fires.
+ */
+class OrFinallyTrigger extends Trigger {
+
+ private static final int ACTUAL = 0;
+ private static final int UNTIL = 1;
+
+ @VisibleForTesting OrFinallyTrigger(Trigger actual, Trigger.OnceTrigger until) {
+ super(Arrays.asList(actual, until));
+ }
+
+ @Override
+ public void onElement(OnElementContext c) throws Exception {
+ c.trigger().subTrigger(ACTUAL).invokeOnElement(c);
+ c.trigger().subTrigger(UNTIL).invokeOnElement(c);
+ }
+
+ @Override
+ public void onMerge(OnMergeContext c) throws Exception {
+ for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
+ subTrigger.invokeOnMerge(c);
+ }
+ updateFinishedState(c);
+ }
+
+ @Override
+ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+ // This trigger fires once either the trigger or the until trigger fires.
+ Instant actualDeadline = subTriggers.get(ACTUAL).getWatermarkThatGuaranteesFiring(window);
+ Instant untilDeadline = subTriggers.get(UNTIL).getWatermarkThatGuaranteesFiring(window);
+ return actualDeadline.isBefore(untilDeadline) ? actualDeadline : untilDeadline;
+ }
+
+ @Override
+ public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+ // Use OrFinallyTrigger instead of AfterFirst because the continuation of ACTUAL
+ // may not be a OnceTrigger.
+ return Repeatedly.forever(
+ new OrFinallyTrigger(
+ continuationTriggers.get(ACTUAL),
+ (Trigger.OnceTrigger) continuationTriggers.get(UNTIL)));
+ }
+
+ @Override
+ public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
+ return context.trigger().subTrigger(ACTUAL).invokeShouldFire(context)
+ || context.trigger().subTrigger(UNTIL).invokeShouldFire(context);
+ }
+
+ @Override
+ public void onFire(Trigger.TriggerContext context) throws Exception {
+ ExecutableTrigger actualSubtrigger = context.trigger().subTrigger(ACTUAL);
+ ExecutableTrigger untilSubtrigger = context.trigger().subTrigger(UNTIL);
+
+ if (untilSubtrigger.invokeShouldFire(context)) {
+ untilSubtrigger.invokeOnFire(context);
+ actualSubtrigger.invokeClear(context);
+ } else {
+ // If until didn't fire, then the actual must have (or it is forbidden to call
+ // onFire) so we are done only if actual is done.
+ actualSubtrigger.invokeOnFire(context);
+ // Do not clear the until trigger, because it tracks data cross firings.
+ }
+ updateFinishedState(context);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s.orFinally(%s)", subTriggers.get(ACTUAL), subTriggers.get(UNTIL));
+ }
+
+ private void updateFinishedState(TriggerContext c) throws Exception {
+ boolean anyStillFinished = false;
+ for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
+ anyStillFinished |= c.forTrigger(subTrigger).trigger().isFinished();
+ }
+ c.trigger().setFinished(anyStillFinished);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
new file mode 100644
index 0000000..8858798
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.util.ExecutableTrigger;
+import org.joda.time.Instant;
+
+/**
+ * Repeat a trigger, either until some condition is met or forever.
+ *
+ * <p>For example, to fire after the end of the window, and every time late data arrives:
+ * <pre> {@code
+ * Repeatedly.forever(AfterWatermark.isPastEndOfWindow());
+ * } </pre>
+ *
+ * <p>{@code Repeatedly.forever(someTrigger)} behaves like an infinite
+ * {@code AfterEach.inOrder(someTrigger, someTrigger, someTrigger, ...)}.
+ */
+public class Repeatedly extends Trigger {
+
+ private static final int REPEATED = 0;
+
+ /**
+ * Create a composite trigger that repeatedly executes the trigger {@code repeated}, firing each
+ * time it fires and ignoring any indications to finish.
+ *
+ * <p>Unless used with {@link Trigger#orFinally} the composite trigger will never finish.
+ *
+ * @param repeated the trigger to execute repeatedly.
+ */
+ public static Repeatedly forever(Trigger repeated) {
+ return new Repeatedly(repeated);
+ }
+
+ private Repeatedly(Trigger repeated) {
+ super(Arrays.asList(repeated));
+ }
+
+
+ @Override
+ public void onElement(OnElementContext c) throws Exception {
+ getRepeated(c).invokeOnElement(c);
+ }
+
+ @Override
+ public void onMerge(OnMergeContext c) throws Exception {
+ getRepeated(c).invokeOnMerge(c);
+ }
+
+ @Override
+ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+ // This trigger fires once the repeated trigger fires.
+ return subTriggers.get(REPEATED).getWatermarkThatGuaranteesFiring(window);
+ }
+
+ @Override
+ public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+ return new Repeatedly(continuationTriggers.get(REPEATED));
+ }
+
+ @Override
+ public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
+ return getRepeated(context).invokeShouldFire(context);
+ }
+
+ @Override
+ public void onFire(TriggerContext context) throws Exception {
+ getRepeated(context).invokeOnFire(context);
+
+ if (context.trigger().isFinished(REPEATED)) {
+ // Reset tree will recursively clear the finished bits, and invoke clear.
+ context.forTrigger(getRepeated(context)).trigger().resetTree();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Repeatedly.forever(%s)", subTriggers.get(REPEATED));
+ }
+
+ private ExecutableTrigger getRepeated(TriggerContext context) {
+ return context.trigger().subTrigger(REPEATED);
+ }
+}