You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/13 22:22:44 UTC
[10/17] incubator-beam git commit: Move triggers to runners-core
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTester.java
new file mode 100644
index 0000000..5fe17ad
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTester.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.ActiveWindowSet.MergeCallback;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateNamespaces.WindowAndTriggerNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
+import org.apache.beam.sdk.util.state.TestInMemoryStateInternals;
+import org.apache.beam.sdk.util.state.TimerCallback;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Test utility that runs a {@link Trigger}, using in-memory stub implementation to provide
+ * the {@link StateInternals}.
+ *
+ * @param <W> The type of windows being used.
+ */
+public class TriggerTester<InputT, W extends BoundedWindow> {
+
+ /**
+ * A {@link TriggerTester} specialized to {@link Integer} values, so elements and timestamps
+ * can be conflated. Today, triggers should not observed the element type, so this is the
+ * only trigger tester that needs to be used.
+ */
+ public static class SimpleTriggerTester<W extends BoundedWindow>
+ extends TriggerTester<Integer, W> {
+
+ private SimpleTriggerTester(WindowingStrategy<Object, W> windowingStrategy) throws Exception {
+ super(windowingStrategy);
+ }
+
+ public void injectElements(int... values) throws Exception {
+ List<TimestampedValue<Integer>> timestampedValues =
+ Lists.newArrayListWithCapacity(values.length);
+ for (int value : values) {
+ timestampedValues.add(TimestampedValue.of(value, new Instant(value)));
+ }
+ injectElements(timestampedValues);
+ }
+
+ public SimpleTriggerTester<W> withAllowedLateness(Duration allowedLateness) throws Exception {
+ return new SimpleTriggerTester<>(
+ windowingStrategy.withAllowedLateness(allowedLateness));
+ }
+ }
+
+ protected final WindowingStrategy<Object, W> windowingStrategy;
+
+ private final TestInMemoryStateInternals<?> stateInternals =
+ new TestInMemoryStateInternals<Object>(null /* key */);
+ private final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
+ private final TriggerContextFactory<W> contextFactory;
+ private final WindowFn<Object, W> windowFn;
+ private final ActiveWindowSet<W> activeWindows;
+ private final Map<W, W> windowToMergeResult;
+
+ /**
+ * An {@link ExecutableTrigger} built from the {@link Trigger} or {@link Trigger}
+ * under test.
+ */
+ private final ExecutableTrigger executableTrigger;
+
+ /**
+ * A map from a window and trigger to whether that trigger is finished for the window.
+ */
+ private final Map<W, FinishedTriggers> finishedSets;
+
+ public static <W extends BoundedWindow> SimpleTriggerTester<W> forTrigger(
+ Trigger trigger, WindowFn<Object, W> windowFn)
+ throws Exception {
+ WindowingStrategy<Object, W> windowingStrategy =
+ WindowingStrategy.of(windowFn).withTrigger(trigger)
+ // Merging requires accumulation mode or early firings can break up a session.
+ // Not currently an issue with the tester (because we never GC) but we don't want
+ // mystery failures due to violating this need.
+ .withMode(windowFn.isNonMerging()
+ ? AccumulationMode.DISCARDING_FIRED_PANES
+ : AccumulationMode.ACCUMULATING_FIRED_PANES);
+
+ return new SimpleTriggerTester<>(windowingStrategy);
+ }
+
+ public static <InputT, W extends BoundedWindow> TriggerTester<InputT, W> forAdvancedTrigger(
+ Trigger trigger, WindowFn<Object, W> windowFn) throws Exception {
+ WindowingStrategy<Object, W> strategy =
+ WindowingStrategy.of(windowFn).withTrigger(trigger)
+ // Merging requires accumulation mode or early firings can break up a session.
+ // Not currently an issue with the tester (because we never GC) but we don't want
+ // mystery failures due to violating this need.
+ .withMode(windowFn.isNonMerging()
+ ? AccumulationMode.DISCARDING_FIRED_PANES
+ : AccumulationMode.ACCUMULATING_FIRED_PANES);
+
+ return new TriggerTester<>(strategy);
+ }
+
+ protected TriggerTester(WindowingStrategy<Object, W> windowingStrategy) throws Exception {
+ this.windowingStrategy = windowingStrategy;
+ this.windowFn = windowingStrategy.getWindowFn();
+ this.executableTrigger = windowingStrategy.getTrigger();
+ this.finishedSets = new HashMap<>();
+
+ this.activeWindows =
+ windowFn.isNonMerging()
+ ? new NonMergingActiveWindowSet<W>()
+ : new MergingActiveWindowSet<W>(windowFn, stateInternals);
+ this.windowToMergeResult = new HashMap<>();
+
+ this.contextFactory =
+ new TriggerContextFactory<>(windowingStrategy.getWindowFn(), stateInternals, activeWindows);
+ }
+
+ /**
+ * Instructs the trigger to clear its state for the given window.
+ */
+ public void clearState(W window) throws Exception {
+ executableTrigger.invokeClear(contextFactory.base(window,
+ new TestTimers(windowNamespace(window)), executableTrigger, getFinishedSet(window)));
+ }
+
+ /**
+ * Asserts that the trigger has actually cleared all of its state for the given window. Since
+ * the trigger under test is the root, this makes the assert for all triggers regardless
+ * of their position in the trigger tree.
+ */
+ public void assertCleared(W window) {
+ for (StateNamespace untypedNamespace : stateInternals.getNamespacesInUse()) {
+ if (untypedNamespace instanceof WindowAndTriggerNamespace) {
+ @SuppressWarnings("unchecked")
+ WindowAndTriggerNamespace<W> namespace = (WindowAndTriggerNamespace<W>) untypedNamespace;
+ if (namespace.getWindow().equals(window)) {
+ Set<?> tagsInUse = stateInternals.getTagsInUse(namespace);
+ assertTrue("Trigger has not cleared tags: " + tagsInUse, tagsInUse.isEmpty());
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns {@code true} if the {@link Trigger} under test is finished for the given window.
+ */
+ public boolean isMarkedFinished(W window) {
+ FinishedTriggers finishedSet = finishedSets.get(window);
+ if (finishedSet == null) {
+ return false;
+ }
+
+ return finishedSet.isFinished(executableTrigger);
+ }
+
+ private StateNamespace windowNamespace(W window) {
+ return StateNamespaces.window(windowFn.windowCoder(), checkNotNull(window));
+ }
+
+ /**
+ * Advance the input watermark to the specified time, then advance the output watermark as far as
+ * possible.
+ */
+ public void advanceInputWatermark(Instant newInputWatermark) throws Exception {
+ // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694
+ timerInternals.advanceInputWatermark(TimerCallback.NO_OP, newInputWatermark);
+ }
+
+ /** Advance the processing time to the specified time. */
+ public void advanceProcessingTime(Instant newProcessingTime) throws Exception {
+ // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694
+ timerInternals.advanceProcessingTime(TimerCallback.NO_OP, newProcessingTime);
+ }
+
+ /**
+ * Inject all the timestamped values (after passing through the window function) as if they
+ * arrived in a single chunk of a bundle (or work-unit).
+ */
+ @SafeVarargs
+ public final void injectElements(TimestampedValue<InputT>... values) throws Exception {
+ injectElements(Arrays.asList(values));
+ }
+
+ public final void injectElements(Collection<TimestampedValue<InputT>> values) throws Exception {
+ for (TimestampedValue<InputT> value : values) {
+ WindowTracing.trace("TriggerTester.injectElements: {}", value);
+ }
+
+ List<WindowedValue<InputT>> windowedValues = Lists.newArrayListWithCapacity(values.size());
+
+ for (TimestampedValue<InputT> input : values) {
+ try {
+ InputT value = input.getValue();
+ Instant timestamp = input.getTimestamp();
+ Collection<W> assignedWindows = windowFn.assignWindows(new TestAssignContext<W>(
+ windowFn, value, timestamp, GlobalWindow.INSTANCE));
+
+ for (W window : assignedWindows) {
+ activeWindows.addActiveForTesting(window);
+
+ // Today, triggers assume onTimer firing at the watermark time, whether or not they
+ // explicitly set the timer themselves. So this tester must set it.
+ timerInternals.setTimer(
+ TimerData.of(windowNamespace(window), window.maxTimestamp(), TimeDomain.EVENT_TIME));
+ }
+
+ windowedValues.add(WindowedValue.of(value, timestamp, assignedWindows, PaneInfo.NO_FIRING));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ for (WindowedValue<InputT> windowedValue : windowedValues) {
+ for (BoundedWindow untypedWindow : windowedValue.getWindows()) {
+ // SDK is responsible for type safety
+ @SuppressWarnings("unchecked")
+ W window = mergeResult((W) untypedWindow);
+
+ Trigger.OnElementContext context = contextFactory.createOnElementContext(window,
+ new TestTimers(windowNamespace(window)), windowedValue.getTimestamp(),
+ executableTrigger, getFinishedSet(window));
+
+ if (!context.trigger().isFinished()) {
+ executableTrigger.invokeOnElement(context);
+ }
+ }
+ }
+ }
+
+ public boolean shouldFire(W window) throws Exception {
+ Trigger.TriggerContext context = contextFactory.base(
+ window,
+ new TestTimers(windowNamespace(window)),
+ executableTrigger, getFinishedSet(window));
+ executableTrigger.getSpec().prefetchShouldFire(context.state());
+ return executableTrigger.invokeShouldFire(context);
+ }
+
+ public void fireIfShouldFire(W window) throws Exception {
+ Trigger.TriggerContext context = contextFactory.base(
+ window,
+ new TestTimers(windowNamespace(window)),
+ executableTrigger, getFinishedSet(window));
+
+ executableTrigger.getSpec().prefetchShouldFire(context.state());
+ if (executableTrigger.invokeShouldFire(context)) {
+ executableTrigger.getSpec().prefetchOnFire(context.state());
+ executableTrigger.invokeOnFire(context);
+ if (context.trigger().isFinished()) {
+ activeWindows.remove(window);
+ executableTrigger.invokeClear(context);
+ }
+ }
+ }
+
+ public void setSubTriggerFinishedForWindow(int subTriggerIndex, W window, boolean value) {
+ getFinishedSet(window).setFinished(executableTrigger.subTriggers().get(subTriggerIndex), value);
+ }
+
+ /**
+ * Invokes merge from the {@link WindowFn} a single time and passes the resulting merge
+ * events on to the trigger under test. Does not persist the fact that merging happened,
+ * since it is just to test the trigger's {@code OnMerge} method.
+ */
+ public final void mergeWindows() throws Exception {
+ windowToMergeResult.clear();
+ activeWindows.merge(new MergeCallback<W>() {
+ @Override
+ public void prefetchOnMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {}
+
+ @Override
+ public void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {
+ List<W> activeToBeMerged = new ArrayList<W>();
+ for (W window : toBeMerged) {
+ windowToMergeResult.put(window, mergeResult);
+ if (activeWindows.isActive(window)) {
+ activeToBeMerged.add(window);
+ }
+ }
+ Map<W, FinishedTriggers> mergingFinishedSets =
+ Maps.newHashMapWithExpectedSize(activeToBeMerged.size());
+ for (W oldWindow : activeToBeMerged) {
+ mergingFinishedSets.put(oldWindow, getFinishedSet(oldWindow));
+ }
+ executableTrigger.invokeOnMerge(contextFactory.createOnMergeContext(mergeResult,
+ new TestTimers(windowNamespace(mergeResult)), executableTrigger,
+ getFinishedSet(mergeResult), mergingFinishedSets));
+ timerInternals.setTimer(TimerData.of(
+ windowNamespace(mergeResult), mergeResult.maxTimestamp(), TimeDomain.EVENT_TIME));
+ }
+ });
+ }
+
+ public W mergeResult(W window) {
+ W result = windowToMergeResult.get(window);
+ return result == null ? window : result;
+ }
+
+ private FinishedTriggers getFinishedSet(W window) {
+ FinishedTriggers finishedSet = finishedSets.get(window);
+ if (finishedSet == null) {
+ finishedSet = FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>());
+ finishedSets.put(window, finishedSet);
+ }
+ return finishedSet;
+ }
+
+ private static class TestAssignContext<W extends BoundedWindow>
+ extends WindowFn<Object, W>.AssignContext {
+ private Object element;
+ private Instant timestamp;
+ private BoundedWindow window;
+
+ public TestAssignContext(
+ WindowFn<Object, W> windowFn, Object element, Instant timestamp, BoundedWindow window) {
+ windowFn.super();
+ this.element = element;
+ this.timestamp = timestamp;
+ this.window = window;
+ }
+
+ @Override
+ public Object element() {
+ return element;
+ }
+
+ @Override
+ public Instant timestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return window;
+ }
+ }
+
+ private class TestTimers implements Timers {
+ private final StateNamespace namespace;
+
+ public TestTimers(StateNamespace namespace) {
+ checkArgument(namespace instanceof WindowNamespace);
+ this.namespace = namespace;
+ }
+
+ @Override
+ public void setTimer(Instant timestamp, TimeDomain timeDomain) {
+ timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain));
+ }
+
+ @Override
+ public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
+ timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain));
+ }
+
+ @Override
+ public Instant currentProcessingTime() {
+ return timerInternals.currentProcessingTime();
+ }
+
+ @Override
+ @Nullable
+ public Instant currentSynchronizedProcessingTime() {
+ return timerInternals.currentSynchronizedProcessingTime();
+ }
+
+ @Override
+ public Instant currentEventTime() {
+ return timerInternals.currentInputWatermarkTime();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
deleted file mode 100644
index cc8c97f..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.base.Joiner;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.ExecutableTrigger;
-import org.joda.time.Instant;
-
-/**
- * Create a {@link Trigger} that fires and finishes once after all of its sub-triggers have fired.
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public class AfterAll extends OnceTrigger {
-
- private AfterAll(List<Trigger> subTriggers) {
- super(subTriggers);
- checkArgument(subTriggers.size() > 1);
- }
-
- /**
- * Returns an {@code AfterAll} {@code Trigger} with the given subtriggers.
- */
- public static OnceTrigger of(OnceTrigger... triggers) {
- return new AfterAll(Arrays.<Trigger>asList(triggers));
- }
-
- @Override
- public void onElement(OnElementContext c) throws Exception {
- for (ExecutableTrigger subTrigger : c.trigger().unfinishedSubTriggers()) {
- // Since subTriggers are all OnceTriggers, they must either CONTINUE or FIRE_AND_FINISH.
- // invokeElement will automatically mark the finish bit if they return FIRE_AND_FINISH.
- subTrigger.invokeOnElement(c);
- }
- }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception {
- for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
- subTrigger.invokeOnMerge(c);
- }
- boolean allFinished = true;
- for (ExecutableTrigger subTrigger1 : c.trigger().subTriggers()) {
- allFinished &= c.forTrigger(subTrigger1).trigger().isFinished();
- }
- c.trigger().setFinished(allFinished);
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
- // This trigger will fire after the latest of its sub-triggers.
- Instant deadline = BoundedWindow.TIMESTAMP_MIN_VALUE;
- for (Trigger subTrigger : subTriggers) {
- Instant subDeadline = subTrigger.getWatermarkThatGuaranteesFiring(window);
- if (deadline.isBefore(subDeadline)) {
- deadline = subDeadline;
- }
- }
- return deadline;
- }
-
- @Override
- public OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) {
- return new AfterAll(continuationTriggers);
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true} if all subtriggers return {@code true}.
- */
- @Override
- public boolean shouldFire(TriggerContext context) throws Exception {
- for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
- if (!context.forTrigger(subtrigger).trigger().isFinished()
- && !subtrigger.invokeShouldFire(context)) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Invokes {@link #onFire} for all subtriggers, eliding redundant calls to {@link #shouldFire}
- * because they all must be ready to fire.
- */
- @Override
- public void onOnlyFiring(TriggerContext context) throws Exception {
- for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
- subtrigger.invokeOnFire(context);
- }
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder("AfterAll.of(");
- Joiner.on(", ").appendTo(builder, subTriggers);
- builder.append(")");
-
- return builder.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
deleted file mode 100644
index c4bc946..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import com.google.common.collect.ImmutableList;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.List;
-import java.util.Locale;
-import java.util.Objects;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.InstantCoder;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
-import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateMerging;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.joda.time.format.PeriodFormat;
-import org.joda.time.format.PeriodFormatter;
-
-/**
- * A base class for triggers that happen after a processing time delay from the arrival
- * of the first element in a pane.
- *
- * <p>This class is for internal use only and may change at any time.
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public abstract class AfterDelayFromFirstElement extends OnceTrigger {
-
- protected static final List<SerializableFunction<Instant, Instant>> IDENTITY =
- ImmutableList.<SerializableFunction<Instant, Instant>>of();
-
- protected static final StateTag<Object, AccumulatorCombiningState<Instant,
- Combine.Holder<Instant>, Instant>> DELAYED_UNTIL_TAG =
- StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
- "delayed", InstantCoder.of(), Min.MinFn.<Instant>naturalOrder()));
-
- private static final PeriodFormatter PERIOD_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH);
-
- /**
- * To complete an implementation, return the desired time from the TriggerContext.
- */
- @Nullable
- public abstract Instant getCurrentTime(Trigger.TriggerContext context);
-
- /**
- * To complete an implementation, return a new instance like this one, but incorporating
- * the provided timestamp mapping functions. Generally should be used by calling the
- * constructor of this class from the constructor of the subclass.
- */
- protected abstract AfterDelayFromFirstElement newWith(
- List<SerializableFunction<Instant, Instant>> transform);
-
- /**
- * A list of timestampMappers m1, m2, m3, ... m_n considered to be composed in sequence. The
- * overall mapping for an instance `instance` is `m_n(... m3(m2(m1(instant))`,
- * implemented via #computeTargetTimestamp
- */
- protected final List<SerializableFunction<Instant, Instant>> timestampMappers;
-
- private final TimeDomain timeDomain;
-
- public AfterDelayFromFirstElement(
- TimeDomain timeDomain,
- List<SerializableFunction<Instant, Instant>> timestampMappers) {
- super(null);
- this.timestampMappers = timestampMappers;
- this.timeDomain = timeDomain;
- }
-
- private Instant getTargetTimestamp(OnElementContext c) {
- return computeTargetTimestamp(c.currentProcessingTime());
- }
-
- /**
- * Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater
- * than the timestamp.
- *
- * <p>TODO: Consider sharing this with FixedWindows, and bring over the equivalent of
- * CalendarWindows.
- */
- public AfterDelayFromFirstElement alignedTo(final Duration size, final Instant offset) {
- return newWith(new AlignFn(size, offset));
- }
-
- /**
- * Aligns the time to be the smallest multiple of {@code size} greater than the timestamp
- * since the epoch.
- */
- public AfterDelayFromFirstElement alignedTo(final Duration size) {
- return alignedTo(size, new Instant(0));
- }
-
- /**
- * Adds some delay to the original target time.
- *
- * @param delay the delay to add
- * @return An updated time trigger that will wait the additional time before firing.
- */
- public AfterDelayFromFirstElement plusDelayOf(final Duration delay) {
- return newWith(new DelayFn(delay));
- }
-
- /**
- * @deprecated This will be removed in the next major version. Please use only
- * {@link #plusDelayOf} and {@link #alignedTo}.
- */
- @Deprecated
- public OnceTrigger mappedTo(SerializableFunction<Instant, Instant> timestampMapper) {
- return newWith(timestampMapper);
- }
-
- @Override
- public boolean isCompatible(Trigger other) {
- if (!getClass().equals(other.getClass())) {
- return false;
- }
-
- AfterDelayFromFirstElement that = (AfterDelayFromFirstElement) other;
- return this.timestampMappers.equals(that.timestampMappers);
- }
-
-
- private AfterDelayFromFirstElement newWith(
- SerializableFunction<Instant, Instant> timestampMapper) {
- return newWith(
- ImmutableList.<SerializableFunction<Instant, Instant>>builder()
- .addAll(timestampMappers)
- .add(timestampMapper)
- .build());
- }
-
- @Override
- @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification =
- "prefetch side effect")
- public void prefetchOnElement(StateAccessor<?> state) {
- state.access(DELAYED_UNTIL_TAG).readLater();
- }
-
- @Override
- public void onElement(OnElementContext c) throws Exception {
- CombiningState<Instant, Instant> delayUntilState = c.state().access(DELAYED_UNTIL_TAG);
- Instant oldDelayUntil = delayUntilState.read();
-
- // Since processing time can only advance, resulting in target wake-up times we would
- // ignore anyhow, we don't bother with it if it is already set.
- if (oldDelayUntil != null) {
- return;
- }
-
- Instant targetTimestamp = getTargetTimestamp(c);
- delayUntilState.add(targetTimestamp);
- c.setTimer(targetTimestamp, timeDomain);
- }
-
- @Override
- public void prefetchOnMerge(MergingStateAccessor<?, ?> state) {
- super.prefetchOnMerge(state);
- StateMerging.prefetchCombiningValues(state, DELAYED_UNTIL_TAG);
- }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception {
- // NOTE: We could try to delete all timers which are still active, but we would
- // need access to a timer context for each merging window.
- // for (CombiningValueStateInternal<Instant, Combine.Holder<Instant>, Instant> state :
- // c.state().accessInEachMergingWindow(DELAYED_UNTIL_TAG).values()) {
- // Instant timestamp = state.get().read();
- // if (timestamp != null) {
- // <context for merging window>.deleteTimer(timestamp, timeDomain);
- // }
- // }
- // Instead let them fire and be ignored.
-
- // If the trigger is already finished, there is no way it will become re-activated
- if (c.trigger().isFinished()) {
- StateMerging.clear(c.state(), DELAYED_UNTIL_TAG);
- // NOTE: We do not attempt to delete the timers.
- return;
- }
-
- // Determine the earliest point across all the windows, and delay to that.
- StateMerging.mergeCombiningValues(c.state(), DELAYED_UNTIL_TAG);
-
- Instant earliestTargetTime = c.state().access(DELAYED_UNTIL_TAG).read();
- if (earliestTargetTime != null) {
- c.setTimer(earliestTargetTime, timeDomain);
- }
- }
-
- @Override
- @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification =
- "prefetch side effect")
- public void prefetchShouldFire(StateAccessor<?> state) {
- state.access(DELAYED_UNTIL_TAG).readLater();
- }
-
- @Override
- public void clear(TriggerContext c) throws Exception {
- c.state().access(DELAYED_UNTIL_TAG).clear();
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
- return BoundedWindow.TIMESTAMP_MAX_VALUE;
- }
-
- @Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- Instant delayedUntil = context.state().access(DELAYED_UNTIL_TAG).read();
- return delayedUntil != null
- && getCurrentTime(context) != null
- && getCurrentTime(context).isAfter(delayedUntil);
- }
-
- @Override
- protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception {
- clear(context);
- }
-
- protected Instant computeTargetTimestamp(Instant time) {
- Instant result = time;
- for (SerializableFunction<Instant, Instant> timestampMapper : timestampMappers) {
- result = timestampMapper.apply(result);
- }
- return result;
- }
-
- /**
- * A {@link SerializableFunction} to delay the timestamp at which this triggers fires.
- */
- private static final class DelayFn implements SerializableFunction<Instant, Instant> {
- private final Duration delay;
-
- public DelayFn(Duration delay) {
- this.delay = delay;
- }
-
- @Override
- public Instant apply(Instant input) {
- return input.plus(delay);
- }
-
- @Override
- public boolean equals(Object object) {
- if (object == this) {
- return true;
- }
-
- if (!(object instanceof DelayFn)) {
- return false;
- }
-
- return this.delay.equals(((DelayFn) object).delay);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(delay);
- }
-
- @Override
- public String toString() {
- return PERIOD_FORMATTER.print(delay.toPeriod());
- }
- }
-
- /**
- * A {@link SerializableFunction} to align an instant to the nearest interval boundary.
- */
- static final class AlignFn implements SerializableFunction<Instant, Instant> {
- private final Duration size;
- private final Instant offset;
-
-
- /**
- * Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater
- * than the timestamp.
- */
- public AlignFn(Duration size, Instant offset) {
- this.size = size;
- this.offset = offset;
- }
-
- @Override
- public Instant apply(Instant point) {
- long millisSinceStart = new Duration(offset, point).getMillis() % size.getMillis();
- return millisSinceStart == 0 ? point : point.plus(size).minus(millisSinceStart);
- }
-
- @Override
- public boolean equals(Object object) {
- if (object == this) {
- return true;
- }
-
- if (!(object instanceof AlignFn)) {
- return false;
- }
-
- AlignFn other = (AlignFn) object;
- return other.size.equals(this.size)
- && other.offset.equals(this.offset);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(size, offset);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
deleted file mode 100644
index 629c640..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.base.Joiner;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.util.ExecutableTrigger;
-import org.joda.time.Instant;
-
-/**
- * A composite {@link Trigger} that executes its sub-triggers in order.
- * Only one sub-trigger is executing at a time,
- * and any time it fires the {@code AfterEach} fires. When the currently executing
- * sub-trigger finishes, the {@code AfterEach} starts executing the next sub-trigger.
- *
- * <p>{@code AfterEach.inOrder(t1, t2, ...)} finishes when all of the sub-triggers have finished.
- *
- * <p>The following properties hold:
- * <ul>
- * <li> {@code AfterEach.inOrder(AfterEach.inOrder(a, b), c)} behaves the same as
- * {@code AfterEach.inOrder(a, b, c)} and {@code AfterEach.inOrder(a, AfterEach.inOrder(b, c)}.
- * <li> {@code AfterEach.inOrder(Repeatedly.forever(a), b)} behaves the same as
- * {@code Repeatedly.forever(a)}, since the repeated trigger never finishes.
- * </ul>
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public class AfterEach extends Trigger {
-
- private AfterEach(List<Trigger> subTriggers) {
- super(subTriggers);
- checkArgument(subTriggers.size() > 1);
- }
-
- /**
- * Returns an {@code AfterEach} {@code Trigger} with the given subtriggers.
- */
- @SafeVarargs
- public static Trigger inOrder(Trigger... triggers) {
- return new AfterEach(Arrays.<Trigger>asList(triggers));
- }
-
- @Override
- public void onElement(OnElementContext c) throws Exception {
- if (!c.trigger().isMerging()) {
- // If merges are not possible, we need only run the first unfinished subtrigger
- c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c);
- } else {
- // If merges are possible, we need to run all subtriggers in parallel
- for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
- // Even if the subTrigger is done, it may be revived via merging and must have
- // adequate state.
- subTrigger.invokeOnElement(c);
- }
- }
- }
-
- @Override
- public void onMerge(OnMergeContext context) throws Exception {
- // If merging makes a subtrigger no-longer-finished, it will automatically
- // begin participating in shouldFire and onFire appropriately.
-
- // All the following triggers are retroactively "not started" but that is
- // also automatic because they are cleared whenever this trigger
- // fires.
- boolean priorTriggersAllFinished = true;
- for (ExecutableTrigger subTrigger : context.trigger().subTriggers()) {
- if (priorTriggersAllFinished) {
- subTrigger.invokeOnMerge(context);
- priorTriggersAllFinished &= context.forTrigger(subTrigger).trigger().isFinished();
- } else {
- subTrigger.invokeClear(context);
- }
- }
- updateFinishedState(context);
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
- // This trigger will fire at least once when the first trigger in the sequence
- // fires at least once.
- return subTriggers.get(0).getWatermarkThatGuaranteesFiring(window);
- }
-
- @Override
- public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
- return Repeatedly.forever(new AfterFirst(continuationTriggers));
- }
-
- @Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- ExecutableTrigger firstUnfinished = context.trigger().firstUnfinishedSubTrigger();
- return firstUnfinished.invokeShouldFire(context);
- }
-
- @Override
- public void onFire(Trigger.TriggerContext context) throws Exception {
- context.trigger().firstUnfinishedSubTrigger().invokeOnFire(context);
-
- // Reset all subtriggers if in a merging context; any may be revived by merging so they are
- // all run in parallel for each pending pane.
- if (context.trigger().isMerging()) {
- for (ExecutableTrigger subTrigger : context.trigger().subTriggers()) {
- subTrigger.invokeClear(context);
- }
- }
-
- updateFinishedState(context);
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder("AfterEach.inOrder(");
- Joiner.on(", ").appendTo(builder, subTriggers);
- builder.append(")");
-
- return builder.toString();
- }
-
- private void updateFinishedState(TriggerContext context) {
- context.trigger().setFinished(context.trigger().firstUnfinishedSubTrigger() == null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
deleted file mode 100644
index 6b06cfa..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.base.Joiner;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.ExecutableTrigger;
-import org.joda.time.Instant;
-
-/**
- * Create a composite {@link Trigger} that fires once after at least one of its sub-triggers have
- * fired.
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public class AfterFirst extends OnceTrigger {
-
- AfterFirst(List<Trigger> subTriggers) {
- super(subTriggers);
- checkArgument(subTriggers.size() > 1);
- }
-
- /**
- * Returns an {@code AfterFirst} {@code Trigger} with the given subtriggers.
- */
- public static OnceTrigger of(OnceTrigger... triggers) {
- return new AfterFirst(Arrays.<Trigger>asList(triggers));
- }
-
- @Override
- public void onElement(OnElementContext c) throws Exception {
- for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
- subTrigger.invokeOnElement(c);
- }
- }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception {
- for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
- subTrigger.invokeOnMerge(c);
- }
- updateFinishedStatus(c);
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
- // This trigger will fire after the earliest of its sub-triggers.
- Instant deadline = BoundedWindow.TIMESTAMP_MAX_VALUE;
- for (Trigger subTrigger : subTriggers) {
- Instant subDeadline = subTrigger.getWatermarkThatGuaranteesFiring(window);
- if (deadline.isAfter(subDeadline)) {
- deadline = subDeadline;
- }
- }
- return deadline;
- }
-
- @Override
- public OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) {
- return new AfterFirst(continuationTriggers);
- }
-
- @Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
- if (context.forTrigger(subtrigger).trigger().isFinished()
- || subtrigger.invokeShouldFire(context)) {
- return true;
- }
- }
- return false;
- }
-
- @Override
- protected void onOnlyFiring(TriggerContext context) throws Exception {
- for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
- TriggerContext subContext = context.forTrigger(subtrigger);
- if (subtrigger.invokeShouldFire(subContext)) {
- // If the trigger is ready to fire, then do whatever it needs to do.
- subtrigger.invokeOnFire(subContext);
- } else {
- // If the trigger is not ready to fire, it is nonetheless true that whatever
- // pending pane it was tracking is now gone.
- subtrigger.invokeClear(subContext);
- }
- }
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder("AfterFirst.of(");
- Joiner.on(", ").appendTo(builder, subTriggers);
- builder.append(")");
-
- return builder.toString();
- }
-
- private void updateFinishedStatus(TriggerContext c) {
- boolean anyFinished = false;
- for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
- anyFinished |= c.forTrigger(subTrigger).trigger().isFinished();
- }
- c.trigger().setFinished(anyFinished);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
deleted file mode 100644
index 8c128dd..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.List;
-import java.util.Objects;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateMerging;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.joda.time.Instant;
-
-/**
- * {@link Trigger}s that fire based on properties of the elements in the current pane.
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public class AfterPane extends OnceTrigger {
-
-private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
- ELEMENTS_IN_PANE_TAG =
- StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
- "count", VarLongCoder.of(), new Sum.SumLongFn()));
-
- private final int countElems;
-
- private AfterPane(int countElems) {
- super(null);
- this.countElems = countElems;
- }
-
- /**
- * Creates a trigger that fires when the pane contains at least {@code countElems} elements.
- */
- public static AfterPane elementCountAtLeast(int countElems) {
- return new AfterPane(countElems);
- }
-
- @Override
- public void onElement(OnElementContext c) throws Exception {
- c.state().access(ELEMENTS_IN_PANE_TAG).add(1L);
- }
-
- @Override
- public void prefetchOnMerge(MergingStateAccessor<?, ?> state) {
- super.prefetchOnMerge(state);
- StateMerging.prefetchCombiningValues(state, ELEMENTS_IN_PANE_TAG);
- }
-
- @Override
- public void onMerge(OnMergeContext context) throws Exception {
- // If we've already received enough elements and finished in some window,
- // then this trigger is just finished.
- if (context.trigger().finishedInAnyMergingWindow()) {
- context.trigger().setFinished(true);
- StateMerging.clear(context.state(), ELEMENTS_IN_PANE_TAG);
- return;
- }
-
- // Otherwise, compute the sum of elements in all the active panes.
- StateMerging.mergeCombiningValues(context.state(), ELEMENTS_IN_PANE_TAG);
- }
-
- @Override
- @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification =
- "prefetch side effect")
- public void prefetchShouldFire(StateAccessor<?> state) {
- state.access(ELEMENTS_IN_PANE_TAG).readLater();
- }
-
- @Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- long count = context.state().access(ELEMENTS_IN_PANE_TAG).read();
- return count >= countElems;
- }
-
- @Override
- public void clear(TriggerContext c) throws Exception {
- c.state().access(ELEMENTS_IN_PANE_TAG).clear();
- }
-
- @Override
- public boolean isCompatible(Trigger other) {
- return this.equals(other);
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
- return BoundedWindow.TIMESTAMP_MAX_VALUE;
- }
-
- @Override
- public OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) {
- return AfterPane.elementCountAtLeast(1);
- }
-
- @Override
- public String toString() {
- return "AfterPane.elementCountAtLeast(" + countElems + ")";
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (!(obj instanceof AfterPane)) {
- return false;
- }
- AfterPane that = (AfterPane) obj;
- return this.countElems == that.countElems;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(countElems);
- }
-
- @Override
- protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception {
- clear(context);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
deleted file mode 100644
index f551118..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import java.util.List;
-import java.util.Objects;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.joda.time.Instant;
-
-/**
- * {@code AfterProcessingTime} triggers fire based on the current processing time. They operate in
- * the real-time domain.
- *
- * <p>The time at which to fire the timer can be adjusted via the methods in
- * {@link AfterDelayFromFirstElement}, such as {@link AfterDelayFromFirstElement#plusDelayOf} or
- * {@link AfterDelayFromFirstElement#alignedTo}.
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public class AfterProcessingTime extends AfterDelayFromFirstElement {
-
- @Override
- @Nullable
- public Instant getCurrentTime(Trigger.TriggerContext context) {
- return context.currentProcessingTime();
- }
-
- private AfterProcessingTime(List<SerializableFunction<Instant, Instant>> transforms) {
- super(TimeDomain.PROCESSING_TIME, transforms);
- }
-
- /**
- * Creates a trigger that fires when the current processing time passes the processing time
- * at which this trigger saw the first element in a pane.
- */
- public static AfterProcessingTime pastFirstElementInPane() {
- return new AfterProcessingTime(IDENTITY);
- }
-
- @Override
- protected AfterProcessingTime newWith(
- List<SerializableFunction<Instant, Instant>> transforms) {
- return new AfterProcessingTime(transforms);
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
- return BoundedWindow.TIMESTAMP_MAX_VALUE;
- }
-
- @Override
- protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
- return new AfterSynchronizedProcessingTime();
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder("AfterProcessingTime.pastFirstElementInPane()");
- for (SerializableFunction<Instant, Instant> delayFn : timestampMappers) {
- builder
- .append(".plusDelayOf(")
- .append(delayFn)
- .append(")");
- }
-
- return builder.toString();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (!(obj instanceof AfterProcessingTime)) {
- return false;
- }
- AfterProcessingTime that = (AfterProcessingTime) obj;
- return Objects.equals(this.timestampMappers, that.timestampMappers);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(getClass(), this.timestampMappers);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
deleted file mode 100644
index 59ece10..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import com.google.common.base.Objects;
-import java.util.Collections;
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.joda.time.Instant;
-
-class AfterSynchronizedProcessingTime extends AfterDelayFromFirstElement {
-
- @Override
- @Nullable
- public Instant getCurrentTime(Trigger.TriggerContext context) {
- return context.currentSynchronizedProcessingTime();
- }
-
- public AfterSynchronizedProcessingTime() {
- super(TimeDomain.SYNCHRONIZED_PROCESSING_TIME,
- Collections.<SerializableFunction<Instant, Instant>>emptyList());
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
- return BoundedWindow.TIMESTAMP_MAX_VALUE;
- }
-
- @Override
- protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
- return this;
- }
-
- @Override
- public String toString() {
- return "AfterSynchronizedProcessingTime.pastFirstElementInPane()";
- }
-
- @Override
- public boolean equals(Object obj) {
- return this == obj || obj instanceof AfterSynchronizedProcessingTime;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(AfterSynchronizedProcessingTime.class);
- }
-
- @Override
- protected AfterSynchronizedProcessingTime
- newWith(List<SerializableFunction<Instant, Instant>> transforms) {
- // ignore transforms
- return this;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
deleted file mode 100644
index e2463d8..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.ImmutableList;
-import java.util.List;
-import java.util.Objects;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.ExecutableTrigger;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.joda.time.Instant;
-
-/**
- * {@code AfterWatermark} triggers fire based on progress of the system watermark. This time is a
- * lower-bound, sometimes heuristically established, on event times that have been fully processed
- * by the pipeline.
- *
- * <p>For sources that provide non-heuristic watermarks (e.g.
- * {@link org.apache.beam.sdk.io.PubsubIO} when using arrival times as event times), the
- * watermark is a strict guarantee that no data with an event time earlier than
- * that watermark will ever be observed in the pipeline. In this case, it's safe to assume that any
- * pane triggered by an {@code AfterWatermark} trigger with a reference point at or beyond the end
- * of the window will be the last pane ever for that window.
- *
- * <p>For sources that provide heuristic watermarks (e.g.
- * {@link org.apache.beam.sdk.io.PubsubIO} when using user-supplied event times), the
- * watermark itself becomes an <i>estimate</i> that no data with an event time earlier than that
- * watermark (i.e. "late data") will ever be observed in the pipeline. These heuristics can
- * often be quite accurate, but the chance of seeing late data for any given window is non-zero.
- * Thus, if absolute correctness over time is important to your use case, you may want to consider
- * using a trigger that accounts for late data. The default trigger,
- * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}, which fires
- * once when the watermark passes the end of the window and then immediately therafter when any
- * late data arrives, is one such example.
- *
- * <p>The watermark is the clock that defines {@link TimeDomain#EVENT_TIME}.
- *
- * <p>Additionaly firings before or after the watermark can be requested by calling
- * {@code AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)} or
- * {@code AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)}.
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public class AfterWatermark {
-
- private static final String TO_STRING = "AfterWatermark.pastEndOfWindow()";
-
- // Static factory class.
- private AfterWatermark() {}
-
- /**
- * Creates a trigger that fires when the watermark passes the end of the window.
- */
- public static FromEndOfWindow pastEndOfWindow() {
- return new FromEndOfWindow();
- }
-
- /**
- * @see AfterWatermark
- */
- public static class AfterWatermarkEarlyAndLate extends Trigger {
-
- private static final int EARLY_INDEX = 0;
- private static final int LATE_INDEX = 1;
-
- private final OnceTrigger earlyTrigger;
- @Nullable
- private final OnceTrigger lateTrigger;
-
- @SuppressWarnings("unchecked")
- public AfterWatermarkEarlyAndLate(OnceTrigger earlyTrigger, OnceTrigger lateTrigger) {
- super(lateTrigger == null
- ? ImmutableList.<Trigger>of(earlyTrigger)
- : ImmutableList.<Trigger>of(earlyTrigger, lateTrigger));
- this.earlyTrigger = checkNotNull(earlyTrigger, "earlyTrigger should not be null");
- this.lateTrigger = lateTrigger;
- }
-
- public Trigger withEarlyFirings(OnceTrigger earlyTrigger) {
- return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
- }
-
- public Trigger withLateFirings(OnceTrigger lateTrigger) {
- return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
- }
-
- @Override
- public void onElement(OnElementContext c) throws Exception {
- if (!c.trigger().isMerging()) {
- // If merges can never happen, we just run the unfinished subtrigger
- c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c);
- } else {
- // If merges can happen, we run for all subtriggers because they might be
- // de-activated or re-activated
- for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
- subTrigger.invokeOnElement(c);
- }
- }
- }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception {
- // NOTE that the ReduceFnRunner will delete all end-of-window timers for the
- // merged-away windows.
-
- ExecutableTrigger earlySubtrigger = c.trigger().subTrigger(EARLY_INDEX);
- // We check the early trigger to determine if we are still processing it or
- // if the end of window has transitioned us to the late trigger
- OnMergeContext earlyContext = c.forTrigger(earlySubtrigger);
-
- // If the early trigger is still active in any merging window then it is still active in
- // the new merged window, because even if the merged window is "done" some pending elements
- // haven't had a chance to fire.
- if (!earlyContext.trigger().finishedInAllMergingWindows() || !endOfWindowReached(c)) {
- earlyContext.trigger().setFinished(false);
- if (lateTrigger != null) {
- ExecutableTrigger lateSubtrigger = c.trigger().subTrigger(LATE_INDEX);
- OnMergeContext lateContext = c.forTrigger(lateSubtrigger);
- lateContext.trigger().setFinished(false);
- lateSubtrigger.invokeClear(lateContext);
- }
- } else {
- // Otherwise the early trigger and end-of-window bit is done for good.
- earlyContext.trigger().setFinished(true);
- if (lateTrigger != null) {
- c.trigger().subTrigger(LATE_INDEX).invokeOnMerge(c);
- }
- }
- }
-
- @Override
- public Trigger getContinuationTrigger() {
- return new AfterWatermarkEarlyAndLate(
- earlyTrigger.getContinuationTrigger(),
- lateTrigger == null ? null : lateTrigger.getContinuationTrigger());
- }
-
- @Override
- protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
- throw new UnsupportedOperationException(
- "Should not call getContinuationTrigger(List<Trigger>)");
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
- // Even without an early or late trigger, we'll still produce a firing at the watermark.
- return window.maxTimestamp();
- }
-
- private boolean endOfWindowReached(Trigger.TriggerContext context) {
- return context.currentEventTime() != null
- && context.currentEventTime().isAfter(context.window().maxTimestamp());
- }
-
- @Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- if (!context.trigger().isFinished(EARLY_INDEX)) {
- // We have not yet transitioned to late firings.
- // We should fire if either the trigger is ready or we reach the end of the window.
- return context.trigger().subTrigger(EARLY_INDEX).invokeShouldFire(context)
- || endOfWindowReached(context);
- } else if (lateTrigger == null) {
- return false;
- } else {
- // We are running the late trigger
- return context.trigger().subTrigger(LATE_INDEX).invokeShouldFire(context);
- }
- }
-
- @Override
- public void onFire(Trigger.TriggerContext context) throws Exception {
- if (!context.forTrigger(context.trigger().subTrigger(EARLY_INDEX)).trigger().isFinished()) {
- onNonLateFiring(context);
- } else if (lateTrigger != null) {
- onLateFiring(context);
- } else {
- // all done
- context.trigger().setFinished(true);
- }
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder(TO_STRING);
-
- if (!(earlyTrigger instanceof Never.NeverTrigger)) {
- builder
- .append(".withEarlyFirings(")
- .append(earlyTrigger)
- .append(")");
- }
-
- if (lateTrigger != null && !(lateTrigger instanceof Never.NeverTrigger)) {
- builder
- .append(".withLateFirings(")
- .append(lateTrigger)
- .append(")");
- }
-
- return builder.toString();
- }
-
- private void onNonLateFiring(Trigger.TriggerContext context) throws Exception {
- // We have not yet transitioned to late firings.
- ExecutableTrigger earlySubtrigger = context.trigger().subTrigger(EARLY_INDEX);
- Trigger.TriggerContext earlyContext = context.forTrigger(earlySubtrigger);
-
- if (!endOfWindowReached(context)) {
- // This is an early firing, since we have not arrived at the end of the window
- // Implicitly repeats
- earlySubtrigger.invokeOnFire(context);
- earlySubtrigger.invokeClear(context);
- earlyContext.trigger().setFinished(false);
- } else {
- // We have arrived at the end of the window; terminate the early trigger
- // and clear out the late trigger's state
- if (earlySubtrigger.invokeShouldFire(context)) {
- earlySubtrigger.invokeOnFire(context);
- }
- earlyContext.trigger().setFinished(true);
- earlySubtrigger.invokeClear(context);
-
- if (lateTrigger == null) {
- // Done if there is no late trigger.
- context.trigger().setFinished(true);
- } else {
- // If there is a late trigger, we transition to it, and need to clear its state
- // because it was run in parallel.
- context.trigger().subTrigger(LATE_INDEX).invokeClear(context);
- }
- }
-
- }
-
- private void onLateFiring(Trigger.TriggerContext context) throws Exception {
- // We are firing the late trigger, with implicit repeat
- ExecutableTrigger lateSubtrigger = context.trigger().subTrigger(LATE_INDEX);
- lateSubtrigger.invokeOnFire(context);
- // It is a OnceTrigger, so it must have finished; unfinished it and clear it
- lateSubtrigger.invokeClear(context);
- context.forTrigger(lateSubtrigger).trigger().setFinished(false);
- }
- }
-
- /**
- * A watermark trigger targeted relative to the end of the window.
- */
- public static class FromEndOfWindow extends OnceTrigger {
-
- private FromEndOfWindow() {
- super(null);
- }
-
- /**
- * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever
- * the given {@code Trigger} fires before the watermark has passed the end of the window.
- */
- public AfterWatermarkEarlyAndLate withEarlyFirings(OnceTrigger earlyFirings) {
- checkNotNull(earlyFirings, "Must specify the trigger to use for early firings");
- return new AfterWatermarkEarlyAndLate(earlyFirings, null);
- }
-
- /**
- * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever
- * the given {@code Trigger} fires after the watermark has passed the end of the window.
- */
- public AfterWatermarkEarlyAndLate withLateFirings(OnceTrigger lateFirings) {
- checkNotNull(lateFirings, "Must specify the trigger to use for late firings");
- return new AfterWatermarkEarlyAndLate(Never.ever(), lateFirings);
- }
-
- @Override
- public void onElement(OnElementContext c) throws Exception {
- // We're interested in knowing when the input watermark passes the end of the window.
- // (It is possible this has already happened, in which case the timer will be fired
- // almost immediately).
- c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
- }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception {
- // NOTE that the ReduceFnRunner will delete all end-of-window timers for the
- // merged-away windows.
-
- if (!c.trigger().finishedInAllMergingWindows()) {
- // If the trigger is still active in any merging window then it is still active in the new
- // merged window, because even if the merged window is "done" some pending elements haven't
- // had a chance to fire
- c.trigger().setFinished(false);
- } else if (!endOfWindowReached(c)) {
- // If the end of the new window has not been reached, then the trigger is active again.
- c.trigger().setFinished(false);
- } else {
- // Otherwise it is done for good
- c.trigger().setFinished(true);
- }
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
- return window.maxTimestamp();
- }
-
- @Override
- public FromEndOfWindow getContinuationTrigger(List<Trigger> continuationTriggers) {
- return this;
- }
-
- @Override
- public String toString() {
- return TO_STRING;
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj instanceof FromEndOfWindow;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(getClass());
- }
-
- @Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- return endOfWindowReached(context);
- }
-
- private boolean endOfWindowReached(Trigger.TriggerContext context) {
- return context.currentEventTime() != null
- && context.currentEventTime().isAfter(context.window().maxTimestamp());
- }
-
- @Override
- protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception { }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
deleted file mode 100644
index d6b72ef..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import java.util.List;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.joda.time.Instant;
-
-/**
- * A trigger that is equivalent to {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
- * See {@link Repeatedly#forever} and {@link AfterWatermark#pastEndOfWindow} for more details.
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public class DefaultTrigger extends Trigger{
-
- private DefaultTrigger() {
- super(null);
- }
-
- /**
- * Returns the default trigger.
- */
- public static DefaultTrigger of() {
- return new DefaultTrigger();
- }
-
- @Override
- public void onElement(OnElementContext c) throws Exception {
- // If the end of the window has already been reached, then we are already ready to fire
- // and do not need to set a wake-up timer.
- if (!endOfWindowReached(c)) {
- c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
- }
- }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception {
- // If the end of the window has already been reached, then we are already ready to fire
- // and do not need to set a wake-up timer.
- if (!endOfWindowReached(c)) {
- c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
- }
- }
-
- @Override
- public void clear(TriggerContext c) throws Exception { }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
- return window.maxTimestamp();
- }
-
- @Override
- public boolean isCompatible(Trigger other) {
- // Semantically, all default triggers are identical
- return other instanceof DefaultTrigger;
- }
-
- @Override
- public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
- return this;
- }
-
- @Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- return endOfWindowReached(context);
- }
-
- private boolean endOfWindowReached(Trigger.TriggerContext context) {
- return context.currentEventTime() != null
- && context.currentEventTime().isAfter(context.window().maxTimestamp());
- }
-
- @Override
- public void onFire(Trigger.TriggerContext context) throws Exception { }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
deleted file mode 100644
index 5f20465..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import java.util.List;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.joda.time.Instant;
-
-/**
- * A trigger which never fires.
- *
- * <p>Using this trigger will only produce output when the watermark passes the end of the
- * {@link BoundedWindow window} plus the {@link Window#withAllowedLateness allowed
- * lateness}.
- */
-public final class Never {
- /**
- * Returns a trigger which never fires. Output will be produced from the using {@link GroupByKey}
- * when the {@link BoundedWindow} closes.
- */
- public static OnceTrigger ever() {
- // NeverTrigger ignores all inputs and is Window-type independent.
- return new NeverTrigger();
- }
-
- // package-private in order to check identity for string formatting.
- static class NeverTrigger extends OnceTrigger {
- protected NeverTrigger() {
- super(null);
- }
-
- @Override
- public void onElement(OnElementContext c) {}
-
- @Override
- public void onMerge(OnMergeContext c) {}
-
- @Override
- protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
- return this;
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
- return BoundedWindow.TIMESTAMP_MAX_VALUE;
- }
-
- @Override
- public boolean shouldFire(Trigger.TriggerContext context) {
- return false;
- }
-
- @Override
- protected void onOnlyFiring(Trigger.TriggerContext context) {
- throw new UnsupportedOperationException(
- String.format("%s should never fire", getClass().getSimpleName()));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
deleted file mode 100644
index 25b7b34..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.util.ExecutableTrigger;
-import org.joda.time.Instant;
-
-/**
- * Executes the {@code actual} trigger until it finishes or until the {@code until} trigger fires.
- */
-class OrFinallyTrigger extends Trigger {
-
- private static final int ACTUAL = 0;
- private static final int UNTIL = 1;
-
- @VisibleForTesting OrFinallyTrigger(Trigger actual, Trigger.OnceTrigger until) {
- super(Arrays.asList(actual, until));
- }
-
- @Override
- public void onElement(OnElementContext c) throws Exception {
- c.trigger().subTrigger(ACTUAL).invokeOnElement(c);
- c.trigger().subTrigger(UNTIL).invokeOnElement(c);
- }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception {
- for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
- subTrigger.invokeOnMerge(c);
- }
- updateFinishedState(c);
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
- // This trigger fires once either the trigger or the until trigger fires.
- Instant actualDeadline = subTriggers.get(ACTUAL).getWatermarkThatGuaranteesFiring(window);
- Instant untilDeadline = subTriggers.get(UNTIL).getWatermarkThatGuaranteesFiring(window);
- return actualDeadline.isBefore(untilDeadline) ? actualDeadline : untilDeadline;
- }
-
- @Override
- public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
- // Use OrFinallyTrigger instead of AfterFirst because the continuation of ACTUAL
- // may not be a OnceTrigger.
- return Repeatedly.forever(
- new OrFinallyTrigger(
- continuationTriggers.get(ACTUAL),
- (Trigger.OnceTrigger) continuationTriggers.get(UNTIL)));
- }
-
- @Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- return context.trigger().subTrigger(ACTUAL).invokeShouldFire(context)
- || context.trigger().subTrigger(UNTIL).invokeShouldFire(context);
- }
-
- @Override
- public void onFire(Trigger.TriggerContext context) throws Exception {
- ExecutableTrigger actualSubtrigger = context.trigger().subTrigger(ACTUAL);
- ExecutableTrigger untilSubtrigger = context.trigger().subTrigger(UNTIL);
-
- if (untilSubtrigger.invokeShouldFire(context)) {
- untilSubtrigger.invokeOnFire(context);
- actualSubtrigger.invokeClear(context);
- } else {
- // If until didn't fire, then the actual must have (or it is forbidden to call
- // onFire) so we are done only if actual is done.
- actualSubtrigger.invokeOnFire(context);
- // Do not clear the until trigger, because it tracks data cross firings.
- }
- updateFinishedState(context);
- }
-
- @Override
- public String toString() {
- return String.format("%s.orFinally(%s)", subTriggers.get(ACTUAL), subTriggers.get(UNTIL));
- }
-
- private void updateFinishedState(TriggerContext c) throws Exception {
- boolean anyStillFinished = false;
- for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
- anyStillFinished |= c.forTrigger(subTrigger).trigger().isFinished();
- }
- c.trigger().setFinished(anyStillFinished);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
deleted file mode 100644
index 8858798..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.util.ExecutableTrigger;
-import org.joda.time.Instant;
-
-/**
- * Repeat a trigger, either until some condition is met or forever.
- *
- * <p>For example, to fire after the end of the window, and every time late data arrives:
- * <pre> {@code
- * Repeatedly.forever(AfterWatermark.isPastEndOfWindow());
- * } </pre>
- *
- * <p>{@code Repeatedly.forever(someTrigger)} behaves like an infinite
- * {@code AfterEach.inOrder(someTrigger, someTrigger, someTrigger, ...)}.
- */
-public class Repeatedly extends Trigger {
-
- private static final int REPEATED = 0;
-
- /**
- * Create a composite trigger that repeatedly executes the trigger {@code repeated}, firing each
- * time it fires and ignoring any indications to finish.
- *
- * <p>Unless used with {@link Trigger#orFinally} the composite trigger will never finish.
- *
- * @param repeated the trigger to execute repeatedly.
- */
- public static Repeatedly forever(Trigger repeated) {
- return new Repeatedly(repeated);
- }
-
- private Repeatedly(Trigger repeated) {
- super(Arrays.asList(repeated));
- }
-
-
- @Override
- public void onElement(OnElementContext c) throws Exception {
- getRepeated(c).invokeOnElement(c);
- }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception {
- getRepeated(c).invokeOnMerge(c);
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
- // This trigger fires once the repeated trigger fires.
- return subTriggers.get(REPEATED).getWatermarkThatGuaranteesFiring(window);
- }
-
- @Override
- public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
- return new Repeatedly(continuationTriggers.get(REPEATED));
- }
-
- @Override
- public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
- return getRepeated(context).invokeShouldFire(context);
- }
-
- @Override
- public void onFire(TriggerContext context) throws Exception {
- getRepeated(context).invokeOnFire(context);
-
- if (context.trigger().isFinished(REPEATED)) {
- // Reset tree will recursively clear the finished bits, and invoke clear.
- context.forTrigger(getRepeated(context)).trigger().resetTree();
- }
- }
-
- @Override
- public String toString() {
- return String.format("Repeatedly.forever(%s)", subTriggers.get(REPEATED));
- }
-
- private ExecutableTrigger getRepeated(TriggerContext context) {
- return context.trigger().subTrigger(REPEATED);
- }
-}