You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/22 02:22:22 UTC
[08/12] incubator-beam git commit: Move some easy stuff into
runners/core-java
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/TriggerRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/TriggerRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/TriggerRunner.java
new file mode 100644
index 0000000..f104f6a
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/TriggerRunner.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+
+import org.joda.time.Instant;
+
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Executes a trigger while managing persistence of information about which subtriggers are
+ * finished. Subtriggers include all recursive trigger expressions as well as the entire trigger.
+ *
+ * <p>Specifically, the responsibilities are:
+ *
+ * <ul>
+ * <li>Invoking the trigger's methods via its {@link ExecutableTrigger} wrapper by
+ * constructing the appropriate trigger contexts.</li>
+ * <li>Committing a record of which subtriggers are finished to persistent state.</li>
+ * <li>Restoring the record of which subtriggers are finished from persistent state.</li>
+ * <li>Clearing out the persisted finished set when a caller indicates
+ * (via {#link #clearFinished}) that it is no longer needed.</li>
+ * </ul>
+ *
+ * <p>These responsibilities are intertwined: trigger contexts include mutable information about
+ * which subtriggers are finished. This class provides the information when building the contexts
+ * and commits the information when the method of the {@link ExecutableTrigger} returns.
+ *
+ * @param <W> The kind of windows being processed.
+ */
+public class TriggerRunner<W extends BoundedWindow> {
+ @VisibleForTesting
+ static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG =
+ StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of()));
+
+ private final ExecutableTrigger rootTrigger;
+ private final TriggerContextFactory<W> contextFactory;
+
+ public TriggerRunner(ExecutableTrigger rootTrigger, TriggerContextFactory<W> contextFactory) {
+ Preconditions.checkState(rootTrigger.getTriggerIndex() == 0);
+ this.rootTrigger = rootTrigger;
+ this.contextFactory = contextFactory;
+ }
+
+ private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) {
+ if (!isFinishedSetNeeded()) {
+ // If no trigger in the tree will ever have finished bits, then we don't need to read them.
+ // So that the code can be agnostic to that fact, we create a BitSet that is all 0 (not
+ // finished) for each trigger in the tree.
+ return FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree());
+ }
+
+ BitSet bitSet = state.read();
+ return bitSet == null
+ ? FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree())
+ : FinishedTriggersBitSet.fromBitSet(bitSet);
+ }
+
+
+ private void clearFinishedBits(ValueState<BitSet> state) {
+ if (!isFinishedSetNeeded()) {
+ // Nothing to clear.
+ return;
+ }
+ state.clear();
+ }
+
+ /** Return true if the trigger is closed in the window corresponding to the specified state. */
+ public boolean isClosed(StateAccessor<?> state) {
+ return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
+ }
+
+ public void prefetchForValue(W window, StateAccessor<?> state) {
+ if (isFinishedSetNeeded()) {
+ state.access(FINISHED_BITS_TAG).readLater();
+ }
+ rootTrigger.getSpec().prefetchOnElement(
+ contextFactory.createStateAccessor(window, rootTrigger));
+ }
+
+ public void prefetchOnFire(W window, StateAccessor<?> state) {
+ if (isFinishedSetNeeded()) {
+ state.access(FINISHED_BITS_TAG).readLater();
+ }
+ rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger));
+ }
+
+ public void prefetchShouldFire(W window, StateAccessor<?> state) {
+ if (isFinishedSetNeeded()) {
+ state.access(FINISHED_BITS_TAG).readLater();
+ }
+ rootTrigger.getSpec().prefetchShouldFire(
+ contextFactory.createStateAccessor(window, rootTrigger));
+ }
+
+ /**
+ * Run the trigger logic to deal with a new value.
+ */
+ public void processValue(W window, Instant timestamp, Timers timers, StateAccessor<?> state)
+ throws Exception {
+ // Clone so that we can detect changes and so that changes here don't pollute merging.
+ FinishedTriggersBitSet finishedSet =
+ readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
+ Trigger.OnElementContext triggerContext = contextFactory.createOnElementContext(
+ window, timers, timestamp, rootTrigger, finishedSet);
+ rootTrigger.invokeOnElement(triggerContext);
+ persistFinishedSet(state, finishedSet);
+ }
+
+ public void prefetchForMerge(
+ W window, Collection<W> mergingWindows, MergingStateAccessor<?, W> state) {
+ if (isFinishedSetNeeded()) {
+ for (ValueState<?> value : state.accessInEachMergingWindow(FINISHED_BITS_TAG).values()) {
+ value.readLater();
+ }
+ }
+ rootTrigger.getSpec().prefetchOnMerge(contextFactory.createMergingStateAccessor(
+ window, mergingWindows, rootTrigger));
+ }
+
+ /**
+ * Run the trigger merging logic as part of executing the specified merge.
+ */
+ public void onMerge(W window, Timers timers, MergingStateAccessor<?, W> state) throws Exception {
+ // Clone so that we can detect changes and so that changes here don't pollute merging.
+ FinishedTriggersBitSet finishedSet =
+ readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
+
+ // And read the finished bits in each merging window.
+ ImmutableMap.Builder<W, FinishedTriggers> builder = ImmutableMap.builder();
+ for (Map.Entry<W, ValueState<BitSet>> entry :
+ state.accessInEachMergingWindow(FINISHED_BITS_TAG).entrySet()) {
+ // Don't need to clone these, since the trigger context doesn't allow modification
+ builder.put(entry.getKey(), readFinishedBits(entry.getValue()));
+ // Clear the underlying finished bits.
+ clearFinishedBits(entry.getValue());
+ }
+ ImmutableMap<W, FinishedTriggers> mergingFinishedSets = builder.build();
+
+ Trigger.OnMergeContext mergeContext = contextFactory.createOnMergeContext(
+ window, timers, rootTrigger, finishedSet, mergingFinishedSets);
+
+ // Run the merge from the trigger
+ rootTrigger.invokeOnMerge(mergeContext);
+
+ persistFinishedSet(state, finishedSet);
+ }
+
+ public boolean shouldFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
+ FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
+ Trigger.TriggerContext context = contextFactory.base(window, timers,
+ rootTrigger, finishedSet);
+ return rootTrigger.invokeShouldFire(context);
+ }
+
+ public void onFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
+ // shouldFire should be false.
+ // However it is too expensive to assert.
+ FinishedTriggersBitSet finishedSet =
+ readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
+ Trigger.TriggerContext context = contextFactory.base(window, timers,
+ rootTrigger, finishedSet);
+ rootTrigger.invokeOnFire(context);
+ persistFinishedSet(state, finishedSet);
+ }
+
+ private void persistFinishedSet(
+ StateAccessor<?> state, FinishedTriggersBitSet modifiedFinishedSet) {
+ if (!isFinishedSetNeeded()) {
+ return;
+ }
+
+ ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG);
+ if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
+ if (modifiedFinishedSet.getBitSet().isEmpty()) {
+ finishedSetState.clear();
+ } else {
+ finishedSetState.write(modifiedFinishedSet.getBitSet());
+ }
+ }
+ }
+
+ /**
+ * Clear the finished bits.
+ */
+ public void clearFinished(StateAccessor<?> state) {
+ clearFinishedBits(state.access(FINISHED_BITS_TAG));
+ }
+
+ /**
+ * Clear the state used for executing triggers, but leave the finished set to indicate
+ * the window is closed.
+ */
+ public void clearState(W window, Timers timers, StateAccessor<?> state) throws Exception {
+ // Don't need to clone, because we'll be clearing the finished bits anyways.
+ FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG));
+ rootTrigger.invokeClear(contextFactory.base(window, timers, rootTrigger, finishedSet));
+ }
+
+ private boolean isFinishedSetNeeded() {
+ // TODO: If we know that no trigger in the tree will ever finish, we don't need to do the
+ // lookup. Right now, we special case this for the DefaultTrigger.
+ return !(rootTrigger.getSpec() instanceof DefaultTrigger);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
new file mode 100644
index 0000000..14ec082
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.StateMerging;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.Serializable;
+
+import javax.annotation.Nullable;
+
+/**
+ * Implements the logic to hold the output watermark for a computation back
+ * until it has seen all the elements it needs based on the input watermark for the
+ * computation.
+ *
+ * <p>The backend ensures the output watermark can never progress beyond the
+ * input watermark for a computation. GroupAlsoByWindows computations may add a 'hold'
+ * to the output watermark in order to prevent it progressing beyond a time within a window.
+ * The hold will be 'cleared' when the associated pane is emitted.
+ *
+ * <p>This class is only intended for use by {@link ReduceFnRunner}. The two evolve together and
+ * will likely break any other uses.
+ *
+ * @param <W> The kind of {@link BoundedWindow} the hold is for.
+ */
+class WatermarkHold<W extends BoundedWindow> implements Serializable {
+ /**
+ * Return tag for state containing the output watermark hold
+ * used for elements.
+ */
+ public static <W extends BoundedWindow>
+ StateTag<Object, WatermarkHoldState<W>> watermarkHoldTagForOutputTimeFn(
+ OutputTimeFn<? super W> outputTimeFn) {
+ return StateTags.<Object, WatermarkHoldState<W>>makeSystemTagInternal(
+ StateTags.<W>watermarkStateInternal("hold", outputTimeFn));
+ }
+
+ /**
+ * Tag for state containing end-of-window and garbage collection output watermark holds.
+ * (We can't piggy-back on the data hold state since the outputTimeFn may be
+ * {@link OutputTimeFns#outputAtLatestInputTimestamp()}, in which case every pane will
+ * would take the end-of-window time as its element time.)
+ */
+ @VisibleForTesting
+ public static final StateTag<Object, WatermarkHoldState<BoundedWindow>> EXTRA_HOLD_TAG =
+ StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal(
+ "extra", OutputTimeFns.outputAtEarliestInputTimestamp()));
+
+ private final TimerInternals timerInternals;
+ private final WindowingStrategy<?, W> windowingStrategy;
+ private final StateTag<Object, WatermarkHoldState<W>> elementHoldTag;
+
+ public WatermarkHold(TimerInternals timerInternals, WindowingStrategy<?, W> windowingStrategy) {
+ this.timerInternals = timerInternals;
+ this.windowingStrategy = windowingStrategy;
+ this.elementHoldTag = watermarkHoldTagForOutputTimeFn(windowingStrategy.getOutputTimeFn());
+ }
+
+ /**
+ * Add a hold to prevent the output watermark progressing beyond the (possibly adjusted) timestamp
+ * of the element in {@code context}. We allow the actual hold time to be shifted later by
+ * {@link OutputTimeFn#assignOutputTime}, but no further than the end of the window. The hold will
+ * remain until cleared by {@link #extractAndRelease}. Return the timestamp at which the hold
+ * was placed, or {@literal null} if no hold was placed.
+ *
+ * <p>In the following we'll write {@code E} to represent an element's timestamp after passing
+ * through the window strategy's output time function, {@code IWM} for the local input watermark,
+ * {@code OWM} for the local output watermark, and {@code GCWM} for the garbage collection
+ * watermark (which is at {@code IWM - getAllowedLateness}). Time progresses from left to right,
+ * and we write {@code [ ... ]} to denote a bounded window with implied lower bound.
+ *
+ * <p>Note that the GCWM will be the same as the IWM if {@code getAllowedLateness}
+ * is {@code ZERO}.
+ *
+ * <p>Here are the cases we need to handle. They are conceptually considered in the
+ * sequence written since if getAllowedLateness is ZERO the GCWM is the same as the IWM.
+ * <ol>
+ * <li>(Normal)
+ * <pre>
+ * |
+ * [ | E ]
+ * |
+ * IWM
+ * </pre>
+ * This is, hopefully, the common and happy case. The element is locally on-time and can
+ * definitely make it to an {@code ON_TIME} pane which we can still set an end-of-window timer
+ * for. We place an element hold at E, which may contribute to the {@code ON_TIME} pane's
+ * timestamp (depending on the output time function). Thus the OWM will not proceed past E
+ * until the next pane fires.
+ *
+ * <li>(Discard - no target window)
+ * <pre>
+ * | |
+ * [ E ] | |
+ * | |
+ * GCWM <-getAllowedLateness-> IWM
+ * </pre>
+ * The element is very locally late. The window has been garbage collected, thus there
+ * is no target pane E could be assigned to. We discard E.
+ *
+ * <li>(Unobservably late)
+ * <pre>
+ * | |
+ * [ | E | ]
+ * | |
+ * OWM IWM
+ * </pre>
+ * The element is locally late, however we can still treat this case as for 'Normal' above
+ * since the IWM has not yet passed the end of the window and the element is ahead of the
+ * OWM. In effect, we get to 'launder' the locally late element and consider it as locally
+ * on-time because no downstream computation can observe the difference.
+ *
+ * <li>(Maybe late 1)
+ * <pre>
+ * | |
+ * [ | E ] |
+ * | |
+ * OWM IWM
+ * </pre>
+ * The end-of-window timer may have already fired for this window, and thus an {@code ON_TIME}
+ * pane may have already been emitted. However, if timer firings have been delayed then it
+ * is possible the {@code ON_TIME} pane has not yet been emitted. We can't place an element
+ * hold since we can't be sure if it will be cleared promptly. Thus this element *may* find
+ * its way into an {@code ON_TIME} pane, but if so it will *not* contribute to that pane's
+ * timestamp. We may however set a garbage collection hold if required.
+ *
+ * <li>(Maybe late 2)
+ * <pre>
+ * | |
+ * [ E | | ]
+ * | |
+ * OWM IWM
+ * </pre>
+ * The end-of-window timer has not yet fired, so this element may still appear in an
+ * {@code ON_TIME} pane. However the element is too late to contribute to the output
+ * watermark hold, and thus won't contribute to the pane's timestamp. We can still place an
+ * end-of-window hold.
+ *
+ * <li>(Maybe late 3)
+ * <pre>
+ * | |
+ * [ E | ] |
+ * | |
+ * OWM IWM
+ * </pre>
+ * As for the (Maybe late 2) case, however we don't even know if the end-of-window timer
+ * has already fired, or it is about to fire. We can place only the garbage collection hold,
+ * if required.
+ *
+ * <li>(Definitely late)
+ * <pre>
+ * | |
+ * [ E ] | |
+ * | |
+ * OWM IWM
+ * </pre>
+ * The element is definitely too late to make an {@code ON_TIME} pane. We are too late to
+ * place an end-of-window hold. We can still place a garbage collection hold if required.
+ *
+ * </ol>
+ */
+ @Nullable
+ public Instant addHolds(ReduceFn<?, ?, ?, W>.ProcessValueContext context) {
+ Instant hold = addElementHold(context);
+ if (hold == null) {
+ hold = addEndOfWindowOrGarbageCollectionHolds(context, false/*paneIsEmpty*/);
+ }
+ return hold;
+ }
+
+ /**
+ * Return {@code timestamp}, possibly shifted forward in time according to the window
+ * strategy's output time function.
+ */
+ private Instant shift(Instant timestamp, W window) {
+ Instant shifted = windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window);
+ Preconditions.checkState(!shifted.isBefore(timestamp),
+ "OutputTimeFn moved element from %s to earlier time %s for window %s",
+ timestamp, shifted, window);
+ Preconditions.checkState(timestamp.isAfter(window.maxTimestamp())
+ || !shifted.isAfter(window.maxTimestamp()),
+ "OutputTimeFn moved element from %s to %s which is beyond end of "
+ + "window %s",
+ timestamp, shifted, window);
+
+ return shifted;
+ }
+
+ /**
+ * Attempt to add an 'element hold'. Return the {@link Instant} at which the hold was
+ * added (ie the element timestamp plus any forward shift requested by the
+ * {@link WindowingStrategy#getOutputTimeFn}), or {@literal null} if no hold was added.
+ * The hold is only added if both:
+ * <ol>
+ * <li>The backend will be able to respect it. In other words the output watermark cannot
+ * be ahead of the proposed hold time.
+ * <li>A timer will be set (by {@link ReduceFnRunner}) to clear the hold by the end of the
+ * window. In other words the input watermark cannot be ahead of the end of the window.
+ * </ol>
+ * The hold ensures the pane which incorporates the element is will not be considered late by
+ * any downstream computation when it is eventually emitted.
+ */
+ @Nullable
+ private Instant addElementHold(ReduceFn<?, ?, ?, W>.ProcessValueContext context) {
+ // Give the window function a chance to move the hold timestamp forward to encourage progress.
+ // (A later hold implies less impediment to the output watermark making progress, which in
+ // turn encourages end-of-window triggers to fire earlier in following computations.)
+ Instant elementHold = shift(context.timestamp(), context.window());
+
+ Instant outputWM = timerInternals.currentOutputWatermarkTime();
+ Instant inputWM = timerInternals.currentInputWatermarkTime();
+
+ String which;
+ boolean tooLate;
+ // TODO: These case labels could be tightened.
+ // See the case analysis in addHolds above for the motivation.
+ if (outputWM != null && elementHold.isBefore(outputWM)) {
+ which = "too late to effect output watermark";
+ tooLate = true;
+ } else if (context.window().maxTimestamp().isBefore(inputWM)) {
+ which = "too late for end-of-window timer";
+ tooLate = true;
+ } else {
+ which = "on time";
+ tooLate = false;
+ Preconditions.checkState(!elementHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+ "Element hold %s is beyond end-of-time", elementHold);
+ context.state().access(elementHoldTag).add(elementHold);
+ }
+ WindowTracing.trace(
+ "WatermarkHold.addHolds: element hold at {} is {} for "
+ + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
+ elementHold, which, context.key(), context.window(), inputWM,
+ outputWM);
+
+ return tooLate ? null : elementHold;
+ }
+
+ /**
+ * Add an end-of-window hold or, if too late for that, a garbage collection hold (if required).
+ * Return the {@link Instant} at which hold was added, or {@literal null} if no hold was added.
+ */
+ @Nullable
+ private Instant addEndOfWindowOrGarbageCollectionHolds(
+ ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) {
+ Instant hold = addEndOfWindowHold(context, paneIsEmpty);
+ if (hold == null) {
+ hold = addGarbageCollectionHold(context, paneIsEmpty);
+ }
+ return hold;
+ }
+
+ /**
+ * Attempt to add an 'end-of-window hold'. Return the {@link Instant} at which the hold was added
+ * (ie the end of window time), or {@literal null} if no end of window hold is possible and we
+ * should fallback to a garbage collection hold.
+ *
+ * <p>We only add the hold if we can be sure a timer will be set (by {@link ReduceFnRunner})
+ * to clear it. In other words, the input watermark cannot be ahead of the end of window time.
+ *
+ * <p>An end-of-window hold is added in two situations:
+ * <ol>
+ * <li>An incoming element came in behind the output watermark (so we are too late for placing
+ * the usual element hold), but it may still be possible to include the element in an
+ * {@link Timing#ON_TIME} pane. We place the end of window hold to ensure that pane will
+ * not be considered late by any downstream computation.
+ * <li>We guarantee an {@link Timing#ON_TIME} pane will be emitted for all windows which saw at
+ * least one element, even if that {@link Timing#ON_TIME} pane is empty. Thus when elements in
+ * a pane are processed due to a fired trigger we must set both an end of window timer and an end
+ * of window hold. Again, the hold ensures the {@link Timing#ON_TIME} pane will not be considered
+ * late by any downstream computation.
+ * </ol>
+ */
+ @Nullable
+ private Instant addEndOfWindowHold(ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) {
+ Instant outputWM = timerInternals.currentOutputWatermarkTime();
+ Instant inputWM = timerInternals.currentInputWatermarkTime();
+ Instant eowHold = context.window().maxTimestamp();
+
+ if (eowHold.isBefore(inputWM)) {
+ WindowTracing.trace(
+ "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is too late for "
+ + "end-of-window timer for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
+ eowHold, context.key(), context.window(), inputWM, outputWM);
+ return null;
+ }
+
+ Preconditions.checkState(outputWM == null || !eowHold.isBefore(outputWM),
+ "End-of-window hold %s cannot be before output watermark %s",
+ eowHold, outputWM);
+ Preconditions.checkState(!eowHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+ "End-of-window hold %s is beyond end-of-time", eowHold);
+ // If paneIsEmpty then this hold is just for empty ON_TIME panes, so we want to keep
+ // the hold away from the combining function in elementHoldTag.
+ // However if !paneIsEmpty then it could make sense to use the elementHoldTag here.
+ // Alas, onMerge is forced to add an end of window or garbage collection hold without
+ // knowing whether an element hold is already in place (stopping to check is too expensive).
+ // This it would end up adding an element hold at the end of the window which could
+ // upset the elementHoldTag combining function.
+ context.state().access(EXTRA_HOLD_TAG).add(eowHold);
+ WindowTracing.trace(
+ "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is on time for "
+ + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
+ eowHold, context.key(), context.window(), inputWM, outputWM);
+ return eowHold;
+ }
+
+ /**
+ * Attempt to add a 'garbage collection hold' if it is required. Return the {@link Instant} at
+ * which the hold was added (ie the end of window time plus allowed lateness),
+ * or {@literal null} if no hold was added.
+ *
+ * <p>We only add the hold if it is distinct from what would be added by
+ * {@link #addEndOfWindowHold}. In other words, {@link WindowingStrategy#getAllowedLateness}
+ * must be non-zero.
+ *
+ * <p>A garbage collection hold is added in two situations:
+ * <ol>
+ * <li>An incoming element came in behind the output watermark, and was too late for placing
+ * the usual element hold or an end of window hold. Place the garbage collection hold so that
+ * we can guarantee when the pane is finally triggered its output will not be dropped due to
+ * excessive lateness by any downstream computation.
+ * <li>The {@link WindowingStrategy#getClosingBehavior()} is
+ * {@link ClosingBehavior#FIRE_ALWAYS}, and thus we guarantee a final pane will be emitted
+ * for all windows which saw at least one element. Again, the garbage collection hold guarantees
+ * that any empty final pane can be given a timestamp which will not be considered beyond
+ * allowed lateness by any downstream computation.
+ * </ol>
+ *
+ * <p>We use {@code paneIsEmpty} to distinguish cases 1 and 2.
+ */
+ @Nullable
+ private Instant addGarbageCollectionHold(
+ ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) {
+ Instant outputWM = timerInternals.currentOutputWatermarkTime();
+ Instant inputWM = timerInternals.currentInputWatermarkTime();
+ Instant eow = context.window().maxTimestamp();
+ Instant gcHold = eow.plus(windowingStrategy.getAllowedLateness());
+
+ if (!windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO)) {
+ WindowTracing.trace(
+ "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary "
+ + "since no allowed lateness for key:{}; window:{}; inputWatermark:{}; "
+ + "outputWatermark:{}",
+ gcHold, context.key(), context.window(), inputWM, outputWM);
+ return null;
+ }
+
+ if (paneIsEmpty && context.windowingStrategy().getClosingBehavior()
+ == ClosingBehavior.FIRE_IF_NON_EMPTY) {
+ WindowTracing.trace(
+ "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary "
+ + "since empty pane and FIRE_IF_NON_EMPTY for key:{}; window:{}; inputWatermark:{}; "
+ + "outputWatermark:{}",
+ gcHold, context.key(), context.window(), inputWM, outputWM);
+ return null;
+ }
+
+ Preconditions.checkState(!gcHold.isBefore(inputWM),
+ "Garbage collection hold %s cannot be before input watermark %s",
+ gcHold, inputWM);
+ Preconditions.checkState(!gcHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+ "Garbage collection hold %s is beyond end-of-time", gcHold);
+ // Same EXTRA_HOLD_TAG vs elementHoldTag discussion as in addEndOfWindowHold above.
+ context.state().access(EXTRA_HOLD_TAG).add(gcHold);
+
+ WindowTracing.trace(
+ "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is on time for "
+ + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
+ gcHold, context.key(), context.window(), inputWM, outputWM);
+ return gcHold;
+ }
+
+ /**
+ * Prefetch watermark holds in preparation for merging.
+ */
+ public void prefetchOnMerge(MergingStateAccessor<?, W> state) {
+ StateMerging.prefetchWatermarks(state, elementHoldTag);
+ }
+
+ /**
+ * Updates the watermark hold when windows merge if it is possible the merged value does
+ * not equal all of the existing holds. For example, if the new window implies a later
+ * watermark hold, then earlier holds may be released.
+ */
+ public void onMerge(ReduceFn<?, ?, ?, W>.OnMergeContext context) {
+ WindowTracing.debug("WatermarkHold.onMerge: for key:{}; window:{}; inputWatermark:{}; "
+ + "outputWatermark:{}",
+ context.key(), context.window(), timerInternals.currentInputWatermarkTime(),
+ timerInternals.currentOutputWatermarkTime());
+ StateMerging.mergeWatermarks(context.state(), elementHoldTag, context.window());
+ // If we had a cheap way to determine if we have an element hold then we could
+ // avoid adding an unnecessary end-of-window or garbage collection hold.
+ // Simply reading the above merged watermark would impose an additional read for the
+ // common case that the active window has just one underlying state address window and
+ // the hold depends on the min of the element timestamps.
+ // At least one merged window must be non-empty for the merge to have been triggered.
+ StateMerging.clear(context.state(), EXTRA_HOLD_TAG);
+ addEndOfWindowOrGarbageCollectionHolds(context, false /*paneIsEmpty*/);
+ }
+
+ /**
+ * Result of {@link #extractAndRelease}.
+ */
+ public static class OldAndNewHolds {
+ public final Instant oldHold;
+ @Nullable
+ public final Instant newHold;
+
+ public OldAndNewHolds(Instant oldHold, @Nullable Instant newHold) {
+ this.oldHold = oldHold;
+ this.newHold = newHold;
+ }
+ }
+
+ /**
+ * Return (a future for) the earliest hold for {@code context}. Clear all the holds after
+ * reading, but add/restore an end-of-window or garbage collection hold if required.
+ *
+ * <p>The returned timestamp is the output timestamp according to the {@link OutputTimeFn}
+ * from the windowing strategy of this {@link WatermarkHold}, combined across all the non-late
+ * elements in the current pane. If there is no such value the timestamp is the end
+ * of the window.
+ */
+ public ReadableState<OldAndNewHolds> extractAndRelease(
+ final ReduceFn<?, ?, ?, W>.Context context, final boolean isFinished) {
+ WindowTracing.debug(
+ "WatermarkHold.extractAndRelease: for key:{}; window:{}; inputWatermark:{}; "
+ + "outputWatermark:{}",
+ context.key(), context.window(), timerInternals.currentInputWatermarkTime(),
+ timerInternals.currentOutputWatermarkTime());
+ final WatermarkHoldState<W> elementHoldState = context.state().access(elementHoldTag);
+ final WatermarkHoldState<BoundedWindow> extraHoldState = context.state().access(EXTRA_HOLD_TAG);
+ return new ReadableState<OldAndNewHolds>() {
+ @Override
+ public ReadableState<OldAndNewHolds> readLater() {
+ elementHoldState.readLater();
+ extraHoldState.readLater();
+ return this;
+ }
+
+ @Override
+ public OldAndNewHolds read() {
+ // Read both the element and extra holds.
+ Instant elementHold = elementHoldState.read();
+ Instant extraHold = extraHoldState.read();
+ Instant oldHold;
+ // Find the minimum, accounting for null.
+ if (elementHold == null) {
+ oldHold = extraHold;
+ } else if (extraHold == null) {
+ oldHold = elementHold;
+ } else if (elementHold.isBefore(extraHold)) {
+ oldHold = elementHold;
+ } else {
+ oldHold = extraHold;
+ }
+ if (oldHold == null || oldHold.isAfter(context.window().maxTimestamp())) {
+ // If no hold (eg because all elements came in behind the output watermark), or
+ // the hold was for garbage collection, take the end of window as the result.
+ WindowTracing.debug(
+ "WatermarkHold.extractAndRelease.read: clipping from {} to end of window "
+ + "for key:{}; window:{}",
+ oldHold, context.key(), context.window());
+ oldHold = context.window().maxTimestamp();
+ }
+ WindowTracing.debug("WatermarkHold.extractAndRelease.read: clearing for key:{}; window:{}",
+ context.key(), context.window());
+
+ // Clear the underlying state to allow the output watermark to progress.
+ elementHoldState.clear();
+ extraHoldState.clear();
+
+ @Nullable Instant newHold = null;
+ if (!isFinished) {
+ // Only need to leave behind an end-of-window or garbage collection hold
+ // if future elements will be processed.
+ newHold = addEndOfWindowOrGarbageCollectionHolds(context, true /*paneIsEmpty*/);
+ }
+
+ return new OldAndNewHolds(oldHold, newHold);
+ }
+ };
+ }
+
+ /**
+ * Clear any remaining holds.
+ */
+ public void clearHolds(ReduceFn<?, ?, ?, W>.Context context) {
+ WindowTracing.debug(
+ "WatermarkHold.clearHolds: For key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
+ context.key(), context.window(), timerInternals.currentInputWatermarkTime(),
+ timerInternals.currentOutputWatermarkTime());
+ context.state().access(elementHoldTag).clear();
+ context.state().access(EXTRA_HOLD_TAG).clear();
+ }
+
+ /**
+ * Return the current data hold, or null if none. Does not clear. For debugging only.
+ */
+ @Nullable
+ public Instant getDataCurrent(ReduceFn<?, ?, ?, W>.Context context) {
+ return context.state().access(elementHoldTag).read();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/test/java/org/apache/beam/sdk/util/BatchTimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/BatchTimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/BatchTimerInternalsTest.java
new file mode 100644
index 0000000..3e1528f
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/BatchTimerInternalsTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaceForTest;
+
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link BatchTimerInternals}.
+ */
+@RunWith(JUnit4.class)
+public class BatchTimerInternalsTest {
+
+ private static final StateNamespace NS1 = new StateNamespaceForTest("NS1");
+
+ @Mock
+ private ReduceFnRunner<?, ?, ?, ?> mockRunner;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testFiringTimers() throws Exception {
+ BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0));
+ TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
+ TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
+
+ underTest.setTimer(processingTime1);
+ underTest.setTimer(processingTime2);
+
+ underTest.advanceProcessingTime(mockRunner, new Instant(20));
+ Mockito.verify(mockRunner).onTimer(processingTime1);
+ Mockito.verifyNoMoreInteractions(mockRunner);
+
+ // Advancing just a little shouldn't refire
+ underTest.advanceProcessingTime(mockRunner, new Instant(21));
+ Mockito.verifyNoMoreInteractions(mockRunner);
+
+ // Adding the timer and advancing a little should refire
+ underTest.setTimer(processingTime1);
+ Mockito.verify(mockRunner).onTimer(processingTime1);
+ underTest.advanceProcessingTime(mockRunner, new Instant(21));
+ Mockito.verifyNoMoreInteractions(mockRunner);
+
+ // And advancing the rest of the way should still have the other timer
+ underTest.advanceProcessingTime(mockRunner, new Instant(30));
+ Mockito.verify(mockRunner).onTimer(processingTime2);
+ Mockito.verifyNoMoreInteractions(mockRunner);
+ }
+
+ @Test
+ public void testTimerOrdering() throws Exception {
+ BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0));
+ TimerData watermarkTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME);
+ TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
+ TimerData watermarkTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.EVENT_TIME);
+ TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
+
+ underTest.setTimer(processingTime1);
+ underTest.setTimer(watermarkTime1);
+ underTest.setTimer(processingTime2);
+ underTest.setTimer(watermarkTime2);
+
+ underTest.advanceInputWatermark(mockRunner, new Instant(30));
+ Mockito.verify(mockRunner).onTimer(watermarkTime1);
+ Mockito.verify(mockRunner).onTimer(watermarkTime2);
+ Mockito.verifyNoMoreInteractions(mockRunner);
+
+ underTest.advanceProcessingTime(mockRunner, new Instant(30));
+ Mockito.verify(mockRunner).onTimer(processingTime1);
+ Mockito.verify(mockRunner).onTimer(processingTime2);
+ Mockito.verifyNoMoreInteractions(mockRunner);
+ }
+
+ @Test
+ public void testDeduplicate() throws Exception {
+ BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0));
+ TimerData watermarkTime = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME);
+ TimerData processingTime = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
+ underTest.setTimer(watermarkTime);
+ underTest.setTimer(watermarkTime);
+ underTest.setTimer(processingTime);
+ underTest.setTimer(processingTime);
+ underTest.advanceProcessingTime(mockRunner, new Instant(20));
+ underTest.advanceInputWatermark(mockRunner, new Instant(20));
+
+ Mockito.verify(mockRunner).onTimer(processingTime);
+ Mockito.verify(mockRunner).onTimer(watermarkTime);
+ Mockito.verifyNoMoreInteractions(mockRunner);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
new file mode 100644
index 0000000..f653f49
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
@@ -0,0 +1,619 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Properties of {@link GroupAlsoByWindowsDoFn}.
+ *
+ * <p>Some properties may not hold of some implementations, due to restrictions on the context
+ * in which the implementation is applicable. For example, some {@code GroupAlsoByWindows} may not
+ * support merging windows.
+ */
+public class GroupAlsoByWindowsProperties {
+
+ /**
+ * A factory of {@link GroupAlsoByWindowsDoFn} so that the various properties can provide
+ * the appropriate windowing strategy under test.
+ */
+ public interface GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> {
+ <W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, OutputT, W>
+ forStrategy(WindowingStrategy<?, W> strategy);
+ }
+
+ /**
+ * Tests that for empty input and the given {@link WindowingStrategy}, the provided GABW
+ * implementation produces no output.
+ *
+ * <p>The input type is deliberately left as a wildcard, since it is not relevant.
+ */
+ public static <K, InputT, OutputT> void emptyInputEmptyOutput(
+ GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory)
+ throws Exception {
+
+ WindowingStrategy<?, IntervalWindow> windowingStrategy =
+ WindowingStrategy.of(FixedWindows.of(Duration.millis(10)));
+
+ DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> result = runGABW(
+ gabwFactory,
+ windowingStrategy,
+ (K) null, // key should never be used
+ Collections.<WindowedValue<InputT>>emptyList());
+
+ assertThat(result.peekOutputElements(), hasSize(0));
+ }
+
+ /**
+ * Tests that for a simple sequence of elements on the same key, the given GABW implementation
+ * correctly groups them according to fixed windows.
+ */
+ public static void groupsElementsIntoFixedWindows(
+ GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
+ throws Exception {
+
+ WindowingStrategy<?, IntervalWindow> windowingStrategy =
+ WindowingStrategy.of(FixedWindows.of(Duration.millis(10)));
+
+ DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
+ runGABW(gabwFactory, windowingStrategy, "key",
+ WindowedValue.of(
+ "v1",
+ new Instant(1),
+ Arrays.asList(window(0, 10)),
+ PaneInfo.NO_FIRING),
+ WindowedValue.of(
+ "v2",
+ new Instant(2),
+ Arrays.asList(window(0, 10)),
+ PaneInfo.NO_FIRING),
+ WindowedValue.of(
+ "v3",
+ new Instant(13),
+ Arrays.asList(window(10, 20)),
+ PaneInfo.NO_FIRING));
+
+ assertThat(result.peekOutputElements(), hasSize(2));
+
+ TimestampedValue<KV<String, Iterable<String>>> item0 =
+ Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10)));
+ assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
+ assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp()));
+
+ TimestampedValue<KV<String, Iterable<String>>> item1 =
+ Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20)));
+ assertThat(item1.getValue().getValue(), contains("v3"));
+ assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp()));
+ }
+
+ /**
+ * Tests that for a simple sequence of elements on the same key, the given GABW implementation
+ * correctly groups them into sliding windows.
+ *
+ * <p>In the input here, each element occurs in multiple windows.
+ */
+ public static void groupsElementsIntoSlidingWindowsWithMinTimestamp(
+ GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
+ throws Exception {
+
+ WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(
+ SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
+
+ DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
+ runGABW(gabwFactory, windowingStrategy, "key",
+ WindowedValue.of(
+ "v1",
+ new Instant(5),
+ Arrays.asList(window(-10, 10), window(0, 20)),
+ PaneInfo.NO_FIRING),
+ WindowedValue.of(
+ "v2",
+ new Instant(15),
+ Arrays.asList(window(0, 20), window(10, 30)),
+ PaneInfo.NO_FIRING));
+
+ assertThat(result.peekOutputElements(), hasSize(3));
+
+ TimestampedValue<KV<String, Iterable<String>>> item0 =
+ Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(-10, 10)));
+ assertThat(item0.getValue().getValue(), contains("v1"));
+ assertThat(item0.getTimestamp(), equalTo(new Instant(5)));
+
+ TimestampedValue<KV<String, Iterable<String>>> item1 =
+ Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 20)));
+ assertThat(item1.getValue().getValue(), containsInAnyOrder("v1", "v2"));
+ assertThat(item1.getTimestamp(), equalTo(new Instant(10)));
+
+ TimestampedValue<KV<String, Iterable<String>>> item2 =
+ Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 30)));
+ assertThat(item2.getValue().getValue(), contains("v2"));
+ assertThat(item2.getTimestamp(), equalTo(new Instant(20)));
+ }
+
+ /**
+ * Tests that for a simple sequence of elements on the same key, the given GABW implementation
+ * correctly groups and combines them according to sliding windows.
+ *
+ * <p>In the input here, each element occurs in multiple windows.
+ */
+ public static void combinesElementsInSlidingWindows(
+ GroupAlsoByWindowsDoFnFactory<String, Long, Long> gabwFactory,
+ CombineFn<Long, ?, Long> combineFn)
+ throws Exception {
+
+ WindowingStrategy<?, IntervalWindow> windowingStrategy =
+ WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
+
+ DoFnTester<KV<String, Iterable<WindowedValue<Long>>>, KV<String, Long>> result =
+ runGABW(gabwFactory, windowingStrategy, "k",
+ WindowedValue.of(
+ 1L,
+ new Instant(5),
+ Arrays.asList(window(-10, 10), window(0, 20)),
+ PaneInfo.NO_FIRING),
+ WindowedValue.of(
+ 2L,
+ new Instant(15),
+ Arrays.asList(window(0, 20), window(10, 30)),
+ PaneInfo.NO_FIRING),
+ WindowedValue.of(
+ 4L,
+ new Instant(18),
+ Arrays.asList(window(0, 20), window(10, 30)),
+ PaneInfo.NO_FIRING));
+
+ assertThat(result.peekOutputElements(), hasSize(3));
+
+ TimestampedValue<KV<String, Long>> item0 =
+ Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(-10, 10)));
+ assertThat(item0.getValue().getKey(), equalTo("k"));
+ assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L))));
+ assertThat(item0.getTimestamp(), equalTo(new Instant(5L)));
+
+ TimestampedValue<KV<String, Long>> item1 =
+ Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 20)));
+ assertThat(item1.getValue().getKey(), equalTo("k"));
+ assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L, 4L))));
+ assertThat(item1.getTimestamp(), equalTo(new Instant(5L)));
+
+ TimestampedValue<KV<String, Long>> item2 =
+ Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 30)));
+ assertThat(item2.getValue().getKey(), equalTo("k"));
+ assertThat(item2.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(2L, 4L))));
+ assertThat(item2.getTimestamp(), equalTo(new Instant(15L)));
+ }
+
+ /**
+ * Tests that the given GABW implementation correctly groups elements that fall into overlapping
+ * windows that are not merged.
+ */
+ public static void groupsIntoOverlappingNonmergingWindows(
+ GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
+ throws Exception {
+
+ WindowingStrategy<?, IntervalWindow> windowingStrategy =
+ WindowingStrategy.of(FixedWindows.of(Duration.millis(10)));
+
+ DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
+ runGABW(gabwFactory, windowingStrategy, "key",
+ WindowedValue.of(
+ "v1",
+ new Instant(1),
+ Arrays.asList(window(0, 5)),
+ PaneInfo.NO_FIRING),
+ WindowedValue.of(
+ "v2",
+ new Instant(4),
+ Arrays.asList(window(1, 5)),
+ PaneInfo.NO_FIRING),
+ WindowedValue.of(
+ "v3",
+ new Instant(4),
+ Arrays.asList(window(0, 5)),
+ PaneInfo.NO_FIRING));
+
+ assertThat(result.peekOutputElements(), hasSize(2));
+
+ TimestampedValue<KV<String, Iterable<String>>> item0 =
+ Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 5)));
+ assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v3"));
+ assertThat(item0.getTimestamp(), equalTo(window(1, 5).maxTimestamp()));
+
+ TimestampedValue<KV<String, Iterable<String>>> item1 =
+ Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(1, 5)));
+ assertThat(item1.getValue().getValue(), contains("v2"));
+ assertThat(item1.getTimestamp(), equalTo(window(0, 5).maxTimestamp()));
+ }
+
+ /**
+ * Tests that the given GABW implementation correctly groups elements into merged sessions.
+ */
+ public static void groupsElementsInMergedSessions(
+ GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
+ throws Exception {
+
+ WindowingStrategy<?, IntervalWindow> windowingStrategy =
+ WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)));
+
+ DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
+ runGABW(gabwFactory, windowingStrategy, "key",
+ WindowedValue.of(
+ "v1",
+ new Instant(0),
+ Arrays.asList(window(0, 10)),
+ PaneInfo.NO_FIRING),
+ WindowedValue.of(
+ "v2",
+ new Instant(5),
+ Arrays.asList(window(5, 15)),
+ PaneInfo.NO_FIRING),
+ WindowedValue.of(
+ "v3",
+ new Instant(15),
+ Arrays.asList(window(15, 25)),
+ PaneInfo.NO_FIRING));
+
+ assertThat(result.peekOutputElements(), hasSize(2));
+
+ TimestampedValue<KV<String, Iterable<String>>> item0 =
+ Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15)));
+ assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
+ assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp()));
+
+ TimestampedValue<KV<String, Iterable<String>>> item1 =
+ Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25)));
+ assertThat(item1.getValue().getValue(), contains("v3"));
+ assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp()));
+ }
+
+ /**
+ * Tests that the given {@link GroupAlsoByWindowsDoFn} implementation combines elements per
+ * session window correctly according to the provided {@link CombineFn}.
+ */
+ public static void combinesElementsPerSession(
+ GroupAlsoByWindowsDoFnFactory<String, Long, Long> gabwFactory,
+ CombineFn<Long, ?, Long> combineFn)
+ throws Exception {
+
+ WindowingStrategy<?, IntervalWindow> windowingStrategy =
+ WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)));
+
+ DoFnTester<KV<String, Iterable<WindowedValue<Long>>>, KV<String, Long>> result =
+ runGABW(gabwFactory, windowingStrategy, "k",
+ WindowedValue.of(
+ 1L,
+ new Instant(0),
+ Arrays.asList(window(0, 10)),
+ PaneInfo.NO_FIRING),
+ WindowedValue.of(
+ 2L,
+ new Instant(5),
+ Arrays.asList(window(5, 15)),
+ PaneInfo.NO_FIRING),
+ WindowedValue.of(
+ 4L,
+ new Instant(15),
+ Arrays.asList(window(15, 25)),
+ PaneInfo.NO_FIRING));
+
+ assertThat(result.peekOutputElements(), hasSize(2));
+
+ TimestampedValue<KV<String, Long>> item0 =
+ Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15)));
+ assertThat(item0.getValue().getKey(), equalTo("k"));
+ assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L))));
+ assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp()));
+
+ TimestampedValue<KV<String, Long>> item1 =
+ Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25)));
+ assertThat(item1.getValue().getKey(), equalTo("k"));
+ assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(4L))));
+ assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp()));
+ }
+
+ /**
+ * Tests that for a simple sequence of elements on the same key, the given GABW implementation
+ * correctly groups them according to fixed windows and also sets the output timestamp
+ * according to the policy {@link OutputTimeFns#outputAtEndOfWindow()}.
+ */
+ public static void groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp(
+ GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
+ throws Exception {
+
+ WindowingStrategy<?, IntervalWindow> windowingStrategy =
+ WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
+ .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
+
+ DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
+ runGABW(gabwFactory, windowingStrategy, "key",
+ WindowedValue.of(
+ "v1",
+ new Instant(1),
+ Arrays.asList(window(0, 10)),
+ PaneInfo.NO_FIRING),
+ WindowedValue.of(
+ "v2",
+ new Instant(2),
+ Arrays.asList(window(0, 10)),
+ PaneInfo.NO_FIRING),
+ WindowedValue.of(
+ "v3",
+ new Instant(13),
+ Arrays.asList(window(10, 20)),
+ PaneInfo.NO_FIRING));
+
+ assertThat(result.peekOutputElements(), hasSize(2));
+
+ TimestampedValue<KV<String, Iterable<String>>> item0 =
+ Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10)));
+ assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
+ assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp()));
+
+ TimestampedValue<KV<String, Iterable<String>>> item1 =
+ Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20)));
+ assertThat(item1.getValue().getValue(), contains("v3"));
+ assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp()));
+ }
+
+ /**
+ * Tests that for a simple sequence of elements on the same key, the given GABW implementation
+ * correctly groups them according to fixed windows and also sets the output timestamp
+ * according to the policy {@link OutputTimeFns#outputAtLatestInputTimestamp()}.
+ */
+ public static void groupsElementsIntoFixedWindowsWithLatestTimestamp(
+ GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
+ throws Exception {
+
+ WindowingStrategy<?, IntervalWindow> windowingStrategy =
+ WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
+ .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp());
+
+ DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
+ runGABW(gabwFactory, windowingStrategy, "k",
+ WindowedValue.of(
+ "v1",
+ new Instant(1),
+ Arrays.asList(window(0, 10)),
+ PaneInfo.NO_FIRING),
+ WindowedValue.of(
+ "v2",
+ new Instant(2),
+ Arrays.asList(window(0, 10)),
+ PaneInfo.NO_FIRING),
+ WindowedValue.of(
+ "v3",
+ new Instant(13),
+ Arrays.asList(window(10, 20)),
+ PaneInfo.NO_FIRING));
+
+ assertThat(result.peekOutputElements(), hasSize(2));
+
+ TimestampedValue<KV<String, Iterable<String>>> item0 =
+ Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10)));
+ assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
+ assertThat(item0.getTimestamp(), equalTo(new Instant(2)));
+
+ TimestampedValue<KV<String, Iterable<String>>> item1 =
+ Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20)));
+ assertThat(item1.getValue().getValue(), contains("v3"));
+ assertThat(item1.getTimestamp(), equalTo(new Instant(13)));
+ }
+
+ /**
+ * Tests that the given GABW implementation correctly groups elements into merged sessions
+ * with output timestamps at the end of the merged window.
+ */
+ public static void groupsElementsInMergedSessionsWithEndOfWindowTimestamp(
+ GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
+ throws Exception {
+
+ WindowingStrategy<?, IntervalWindow> windowingStrategy =
+ WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
+ .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
+
+ DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
+ runGABW(gabwFactory, windowingStrategy, "k",
+ WindowedValue.of(
+ "v1",
+ new Instant(0),
+ Arrays.asList(window(0, 10)),
+ PaneInfo.NO_FIRING),
+ WindowedValue.of(
+ "v2",
+ new Instant(5),
+ Arrays.asList(window(5, 15)),
+ PaneInfo.NO_FIRING),
+ WindowedValue.of(
+ "v3",
+ new Instant(15),
+ Arrays.asList(window(15, 25)),
+ PaneInfo.NO_FIRING));
+
+ assertThat(result.peekOutputElements(), hasSize(2));
+
+ TimestampedValue<KV<String, Iterable<String>>> item0 =
+ Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15)));
+ assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
+ assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp()));
+
+ TimestampedValue<KV<String, Iterable<String>>> item1 =
+ Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25)));
+ assertThat(item1.getValue().getValue(), contains("v3"));
+ assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp()));
+ }
+
+ /**
+ * Tests that the given GABW implementation correctly groups elements into merged sessions
+ * with output timestamps at the end of the merged window.
+ */
+ public static void groupsElementsInMergedSessionsWithLatestTimestamp(
+ GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
+ throws Exception {
+
+ WindowingStrategy<?, IntervalWindow> windowingStrategy =
+ WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
+ .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp());
+
+ BoundedWindow unmergedWindow = window(15, 25);
+ DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
+ runGABW(gabwFactory, windowingStrategy, "k",
+ WindowedValue.of(
+ "v1",
+ new Instant(0),
+ Arrays.asList(window(0, 10)),
+ PaneInfo.NO_FIRING),
+ WindowedValue.of(
+ "v2",
+ new Instant(5),
+ Arrays.asList(window(5, 15)),
+ PaneInfo.NO_FIRING),
+ WindowedValue.of(
+ "v3",
+ new Instant(15),
+ Arrays.asList(unmergedWindow),
+ PaneInfo.NO_FIRING));
+
+ assertThat(result.peekOutputElements(), hasSize(2));
+
+ BoundedWindow mergedWindow = window(0, 15);
+ TimestampedValue<KV<String, Iterable<String>>> item0 =
+ Iterables.getOnlyElement(result.peekOutputElementsInWindow(mergedWindow));
+ assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
+ assertThat(item0.getTimestamp(), equalTo(new Instant(5)));
+
+ TimestampedValue<KV<String, Iterable<String>>> item1 =
+ Iterables.getOnlyElement(result.peekOutputElementsInWindow(unmergedWindow));
+ assertThat(item1.getValue().getValue(), contains("v3"));
+ assertThat(item1.getTimestamp(), equalTo(new Instant(15)));
+ }
+
+ /**
+ * Tests that the given {@link GroupAlsoByWindowsDoFn} implementation combines elements per
+ * session window correctly according to the provided {@link CombineFn}.
+ */
+ public static void combinesElementsPerSessionWithEndOfWindowTimestamp(
+ GroupAlsoByWindowsDoFnFactory<String, Long, Long> gabwFactory,
+ CombineFn<Long, ?, Long> combineFn)
+ throws Exception {
+
+ WindowingStrategy<?, IntervalWindow> windowingStrategy =
+ WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
+ .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
+
+ BoundedWindow secondWindow = window(15, 25);
+ DoFnTester<?, KV<String, Long>> result =
+ runGABW(gabwFactory, windowingStrategy, "k",
+ WindowedValue.of(
+ 1L,
+ new Instant(0),
+ Arrays.asList(window(0, 10)),
+ PaneInfo.NO_FIRING),
+ WindowedValue.of(
+ 2L,
+ new Instant(5),
+ Arrays.asList(window(5, 15)),
+ PaneInfo.NO_FIRING),
+ WindowedValue.of(
+ 4L,
+ new Instant(15),
+ Arrays.asList(secondWindow),
+ PaneInfo.NO_FIRING));
+
+ assertThat(result.peekOutputElements(), hasSize(2));
+
+ BoundedWindow firstResultWindow = window(0, 15);
+ TimestampedValue<KV<String, Long>> item0 =
+ Iterables.getOnlyElement(result.peekOutputElementsInWindow(firstResultWindow));
+ assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L))));
+ assertThat(item0.getTimestamp(), equalTo(firstResultWindow.maxTimestamp()));
+
+ TimestampedValue<KV<String, Long>> item1 =
+ Iterables.getOnlyElement(result.peekOutputElementsInWindow(secondWindow));
+ assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(4L))));
+ assertThat(item1.getTimestamp(),
+ equalTo(secondWindow.maxTimestamp()));
+ }
+
+ @SafeVarargs
+ private static <K, InputT, OutputT, W extends BoundedWindow>
+ DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> runGABW(
+ GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory,
+ WindowingStrategy<?, W> windowingStrategy,
+ K key,
+ WindowedValue<InputT>... values) throws Exception {
+ return runGABW(gabwFactory, windowingStrategy, key, Arrays.asList(values));
+ }
+
+ private static <K, InputT, OutputT, W extends BoundedWindow>
+ DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> runGABW(
+ GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory,
+ WindowingStrategy<?, W> windowingStrategy,
+ K key,
+ Collection<WindowedValue<InputT>> values) throws Exception {
+
+ TupleTag<KV<K, OutputT>> outputTag = new TupleTag<>();
+ DoFnRunnerBase.ListOutputManager outputManager = new DoFnRunnerBase.ListOutputManager();
+
+ DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> tester =
+ DoFnTester.of(gabwFactory.forStrategy(windowingStrategy));
+ tester.startBundle();
+ tester.processElement(KV.<K, Iterable<WindowedValue<InputT>>>of(key, values));
+ tester.finishBundle();
+
+ // Sanity check for corruption
+ for (KV<K, OutputT> elem : tester.peekOutputElements()) {
+ assertThat(elem.getKey(), equalTo(key));
+ }
+
+ return tester;
+ }
+
+ private static BoundedWindow window(long start, long end) {
+ return new IntervalWindow(new Instant(start), new Instant(end));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
new file mode 100644
index 0000000..4ac6164
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.GroupAlsoByWindowsProperties.GroupAlsoByWindowsDoFnFactory;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Unit tests for {@link GroupAlsoByWindowsViaOutputBufferDoFn}.
+ */
+@RunWith(JUnit4.class)
+public class GroupAlsoByWindowsViaOutputBufferDoFnTest {
+
+ private class BufferingGABWViaOutputBufferDoFnFactory<K, InputT>
+ implements GroupAlsoByWindowsDoFnFactory<K, InputT, Iterable<InputT>> {
+
+ private final Coder<InputT> inputCoder;
+
+ public BufferingGABWViaOutputBufferDoFnFactory(Coder<InputT> inputCoder) {
+ this.inputCoder = inputCoder;
+ }
+
+ @Override
+ public <W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, Iterable<InputT>, W>
+ forStrategy(WindowingStrategy<?, W> windowingStrategy) {
+ return new GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, Iterable<InputT>, W>(
+ windowingStrategy,
+ SystemReduceFn.<K, InputT, W>buffering(inputCoder));
+ }
+ }
+
+ @Test
+ public void testEmptyInputEmptyOutput() throws Exception {
+ GroupAlsoByWindowsProperties.emptyInputEmptyOutput(
+ new BufferingGABWViaOutputBufferDoFnFactory<>(StringUtf8Coder.of()));
+ }
+
+ @Test
+ public void testGroupsElementsIntoFixedWindows() throws Exception {
+ GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindows(
+ new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
+ }
+
+ @Test
+ public void testGroupsElementsIntoSlidingWindows() throws Exception {
+ GroupAlsoByWindowsProperties.groupsElementsIntoSlidingWindowsWithMinTimestamp(
+ new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
+ }
+
+ @Test
+ public void testGroupsIntoOverlappingNonmergingWindows() throws Exception {
+ GroupAlsoByWindowsProperties.groupsIntoOverlappingNonmergingWindows(
+ new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
+ }
+
+ @Test
+ public void testGroupsIntoSessions() throws Exception {
+ GroupAlsoByWindowsProperties.groupsElementsInMergedSessions(
+ new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
+ }
+
+ @Test
+ public void testGroupsElementsIntoFixedWindowsWithEndOfWindowTimestamp() throws Exception {
+ GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp(
+ new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
+ }
+
+ @Test
+ public void testGroupsElementsIntoFixedWindowsWithLatestTimestamp() throws Exception {
+ GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindowsWithLatestTimestamp(
+ new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
+ }
+
+ @Test
+ public void testGroupsElementsIntoSessionsWithEndOfWindowTimestamp() throws Exception {
+ GroupAlsoByWindowsProperties.groupsElementsInMergedSessionsWithEndOfWindowTimestamp(
+ new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
+ }
+
+ @Test
+ public void testGroupsElementsIntoSessionsWithLatestTimestamp() throws Exception {
+ GroupAlsoByWindowsProperties.groupsElementsInMergedSessionsWithLatestTimestamp(
+ new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/test/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunnerTest.java
new file mode 100644
index 0000000..d929d39
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunnerTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.LateDataDroppingDoFnRunner.LateDataFilter;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Arrays;
+
+/**
+ * Unit tests for {@link LateDataDroppingDoFnRunner}.
+ */
+@RunWith(JUnit4.class)
+public class LateDataDroppingDoFnRunnerTest {
+ private static final FixedWindows WINDOW_FN = FixedWindows.of(Duration.millis(10));
+
+ @Mock private TimerInternals mockTimerInternals;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testLateDataFilter() throws Exception {
+ when(mockTimerInternals.currentInputWatermarkTime()).thenReturn(new Instant(15L));
+
+ InMemoryLongSumAggregator droppedDueToLateness =
+ new InMemoryLongSumAggregator("droppedDueToLateness");
+ LateDataFilter lateDataFilter = new LateDataFilter(
+ WindowingStrategy.of(WINDOW_FN), mockTimerInternals, droppedDueToLateness);
+
+ Iterable<WindowedValue<Integer>> actual = lateDataFilter.filter(
+ "a",
+ ImmutableList.of(
+ createDatum(13, 13L),
+ createDatum(5, 5L), // late element, earlier than 4L.
+ createDatum(16, 16L),
+ createDatum(18, 18L)));
+
+ Iterable<WindowedValue<Integer>> expected = ImmutableList.of(
+ createDatum(13, 13L),
+ createDatum(16, 16L),
+ createDatum(18, 18L));
+ assertThat(expected, containsInAnyOrder(Iterables.toArray(actual, WindowedValue.class)));
+ assertEquals(1, droppedDueToLateness.sum);
+ }
+
+ private <T> WindowedValue<T> createDatum(T element, long timestampMillis) {
+ Instant timestamp = new Instant(timestampMillis);
+ return WindowedValue.of(
+ element,
+ timestamp,
+ Arrays.asList(WINDOW_FN.assignWindow(timestamp)),
+ PaneInfo.NO_FIRING);
+ }
+
+ private static class InMemoryLongSumAggregator implements Aggregator<Long, Long> {
+ private final String name;
+ private long sum = 0;
+
+ public InMemoryLongSumAggregator(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public void addValue(Long value) {
+ sum += value;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public CombineFn<Long, ?, Long> getCombineFn() {
+ return new Sum.SumLongFn();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java
new file mode 100644
index 0000000..8885118
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import com.google.common.collect.ImmutableList;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for {@link PushbackSideInputDoFnRunner}.
+ */
+@RunWith(JUnit4.class)
+public class PushbackSideInputDoFnRunnerTest {
+ @Mock private ReadyCheckingSideInputReader reader;
+ private TestDoFnRunner<Integer, Integer> underlying;
+ private PCollectionView<Integer> singletonView;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ TestPipeline p = TestPipeline.create();
+ PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
+ singletonView =
+ created
+ .apply(Window.into(new IdentitySideInputWindowFn()))
+ .apply(Sum.integersGlobally().asSingletonView());
+
+ underlying = new TestDoFnRunner<>();
+ }
+
+ private PushbackSideInputDoFnRunner<Integer, Integer> createRunner(
+ ImmutableList<PCollectionView<?>> views) {
+ PushbackSideInputDoFnRunner<Integer, Integer> runner =
+ PushbackSideInputDoFnRunner.create(underlying, views, reader);
+ runner.startBundle();
+ return runner;
+ }
+
+ @Test
+ public void startFinishBundleDelegates() {
+ PushbackSideInputDoFnRunner runner =
+ createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+ assertThat(underlying.started, is(true));
+ assertThat(underlying.finished, is(false));
+ runner.finishBundle();
+ assertThat(underlying.finished, is(true));
+ }
+
+ @Test
+ public void processElementSideInputNotReady() {
+ when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+ .thenReturn(false);
+
+ PushbackSideInputDoFnRunner<Integer, Integer> runner =
+ createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+ WindowedValue<Integer> oneWindow =
+ WindowedValue.of(
+ 2,
+ new Instant(-2),
+ new IntervalWindow(new Instant(-500L), new Instant(0L)),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ Iterable<WindowedValue<Integer>> oneWindowPushback =
+ runner.processElementInReadyWindows(oneWindow);
+ assertThat(oneWindowPushback, containsInAnyOrder(oneWindow));
+ assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
+ }
+
+ @Test
+ public void processElementSideInputNotReadyMultipleWindows() {
+ when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+ .thenReturn(false);
+
+ PushbackSideInputDoFnRunner<Integer, Integer> runner =
+ createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+ WindowedValue<Integer> multiWindow =
+ WindowedValue.of(
+ 2,
+ new Instant(-2),
+ ImmutableList.of(
+ new IntervalWindow(new Instant(-500L), new Instant(0L)),
+ new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
+ GlobalWindow.INSTANCE),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ Iterable<WindowedValue<Integer>> multiWindowPushback =
+ runner.processElementInReadyWindows(multiWindow);
+ assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows()));
+ assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
+ }
+
+ @Test
+ public void processElementSideInputNotReadySomeWindows() {
+ when(reader.isReady(Mockito.eq(singletonView), Mockito.eq(GlobalWindow.INSTANCE)))
+ .thenReturn(false);
+ when(
+ reader.isReady(
+ Mockito.eq(singletonView),
+ org.mockito.AdditionalMatchers.not(Mockito.eq(GlobalWindow.INSTANCE))))
+ .thenReturn(true);
+
+ PushbackSideInputDoFnRunner<Integer, Integer> runner =
+ createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+ IntervalWindow littleWindow = new IntervalWindow(new Instant(-500L), new Instant(0L));
+ IntervalWindow bigWindow =
+ new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L));
+ WindowedValue<Integer> multiWindow =
+ WindowedValue.of(
+ 2,
+ new Instant(-2),
+ ImmutableList.of(littleWindow, bigWindow, GlobalWindow.INSTANCE),
+ PaneInfo.NO_FIRING);
+ Iterable<WindowedValue<Integer>> multiWindowPushback =
+ runner.processElementInReadyWindows(multiWindow);
+ assertThat(
+ multiWindowPushback,
+ containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-2L))));
+ assertThat(underlying.inputElems,
+ containsInAnyOrder(WindowedValue.of(2, new Instant(-2), littleWindow, PaneInfo.NO_FIRING),
+ WindowedValue.of(2, new Instant(-2), bigWindow, PaneInfo.NO_FIRING)));
+ }
+
+ @Test
+ public void processElementSideInputReadyAllWindows() {
+ when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+ .thenReturn(true);
+
+ ImmutableList<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(singletonView);
+ PushbackSideInputDoFnRunner<Integer, Integer> runner = createRunner(views);
+
+ WindowedValue<Integer> multiWindow =
+ WindowedValue.of(
+ 2,
+ new Instant(-2),
+ ImmutableList.of(
+ new IntervalWindow(new Instant(-500L), new Instant(0L)),
+ new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
+ GlobalWindow.INSTANCE),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ Iterable<WindowedValue<Integer>> multiWindowPushback =
+ runner.processElementInReadyWindows(multiWindow);
+ assertThat(multiWindowPushback, emptyIterable());
+ assertThat(underlying.inputElems,
+ containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray()));
+ }
+
+ @Test
+ public void processElementNoSideInputs() {
+ PushbackSideInputDoFnRunner<Integer, Integer> runner =
+ createRunner(ImmutableList.<PCollectionView<?>>of());
+
+ WindowedValue<Integer> multiWindow =
+ WindowedValue.of(
+ 2,
+ new Instant(-2),
+ ImmutableList.of(
+ new IntervalWindow(new Instant(-500L), new Instant(0L)),
+ new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
+ GlobalWindow.INSTANCE),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ Iterable<WindowedValue<Integer>> multiWindowPushback =
+ runner.processElementInReadyWindows(multiWindow);
+ assertThat(multiWindowPushback, emptyIterable());
+ assertThat(underlying.inputElems, containsInAnyOrder(multiWindow));
+ }
+
+ private static class TestDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+ List<WindowedValue<InputT>> inputElems;
+ private boolean started = false;
+ private boolean finished = false;
+
+ @Override
+ public void startBundle() {
+ started = true;
+ inputElems = new ArrayList<>();
+ }
+
+ @Override
+ public void processElement(WindowedValue<InputT> elem) {
+ inputElems.add(elem);
+ }
+
+ @Override
+ public void finishBundle() {
+ finished = true;
+ }
+ }
+}