You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/22 02:22:18 UTC
[04/12] incubator-beam git commit: Move some easy stuff into
runners/core-java
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
deleted file mode 100644
index 864e8e7..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
+++ /dev/null
@@ -1,985 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
-import org.apache.beam.sdk.util.ReduceFnContextFactory.OnTriggerCallbacks;
-import org.apache.beam.sdk.util.ReduceFnContextFactory.StateStyle;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-/**
- * Manages the execution of a {@link ReduceFn} after a {@link GroupByKeyOnly} has partitioned the
- * {@link PCollection} by key.
- *
- * <p>The {@link #onTrigger} relies on a {@link TriggerRunner} to manage the execution of
- * the triggering logic. The {@code ReduceFnRunner}s responsibilities are:
- *
- * <ul>
- * <li>Tracking the windows that are active (have buffered data) as elements arrive and
- * triggers are fired.
- * <li>Holding the watermark based on the timestamps of elements in a pane and releasing it
- * when the trigger fires.
- * <li>Calling the appropriate callbacks on {@link ReduceFn} based on trigger execution, timer
- * firings, etc, and providing appropriate contexts to the {@link ReduceFn} for actions
- * such as output.
- * <li>Scheduling garbage collection of state associated with a specific window, and making that
- * happen when the appropriate timer fires.
- * </ul>
- *
- * @param <K> The type of key being processed.
- * @param <InputT> The type of values associated with the key.
- * @param <OutputT> The output type that will be produced for each key.
- * @param <W> The type of windows this operates on.
- */
-public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
-
- /**
- * The {@link ReduceFnRunner} depends on most aspects of the {@link WindowingStrategy}.
- *
- * <ul>
- * <li>It runs the trigger from the {@link WindowingStrategy}.</li>
- * <li>It merges windows according to the {@link WindowingStrategy}.</li>
- * <li>It chooses how to track active windows and clear out expired windows
- * according to the {@link WindowingStrategy}, based on the allowed lateness and
- * whether windows can merge.</li>
- * <li>It decides whether to emit empty final panes according to whether the
- * {@link WindowingStrategy} requires it.<li>
- * <li>It uses discarding or accumulation mode according to the {@link WindowingStrategy}.</li>
- * </ul>
- */
- private final WindowingStrategy<Object, W> windowingStrategy;
-
- private final OutputWindowedValue<KV<K, OutputT>> outputter;
-
- private final StateInternals<K> stateInternals;
-
- private final Aggregator<Long, Long> droppedDueToClosedWindow;
-
- private final K key;
-
- /**
- * Track which windows are still active and the 'state address' windows which hold their state.
- *
- * <ul>
- * <li>State: Global map for all active windows for this computation and key.
- * <li>Lifetime: Cleared when no active windows need to be tracked. A window lives within
- * the active window set until its trigger is closed or the window is garbage collected.
- * </ul>
- */
- private final ActiveWindowSet<W> activeWindows;
-
- /**
- * Always a {@link SystemReduceFn}.
- *
- * <ul>
- * <li>State: A bag of accumulated values, or the intermediate result of a combiner.
- * <li>State style: RENAMED
- * <li>Merging: Concatenate or otherwise combine the state from each merged window.
- * <li>Lifetime: Cleared when a pane fires if DISCARDING_FIRED_PANES. Otherwise cleared
- * when trigger is finished or when the window is garbage collected.
- * </ul>
- */
- private final ReduceFn<K, InputT, OutputT, W> reduceFn;
-
- /**
- * Manage the setting and firing of timer events.
- *
- * <ul>
- * <li>Merging: End-of-window and garbage collection timers are cancelled when windows are
- * merged away. Timers created by triggers are never garbage collected and are left to
- * fire and be ignored.
- * <li>Lifetime: Timers automatically disappear after they fire.
- * </ul>
- */
- private final TimerInternals timerInternals;
-
- /**
- * Manage the execution and state for triggers.
- *
- * <ul>
- * <li>State: Tracks which sub-triggers have finished, and any additional state needed to
- * determine when the trigger should fire.
- * <li>State style: DIRECT
- * <li>Merging: Finished bits are explicitly managed. Other state is eagerly merged as
- * needed.
- * <li>Lifetime: Most trigger state is cleared when the final pane is emitted. However
- * the finished bits are left behind and must be cleared when the window is
- * garbage collected.
- * </ul>
- */
- private final TriggerRunner<W> triggerRunner;
-
- /**
- * Store the output watermark holds for each window.
- *
- * <ul>
- * <li>State: Bag of hold timestamps.
- * <li>State style: RENAMED
- * <li>Merging: Depending on {@link OutputTimeFn}, may need to be recalculated on merging.
- * When a pane fires it may be necessary to add (back) an end-of-window or garbage collection
- * hold.
- * <li>Lifetime: Cleared when a pane fires or when the window is garbage collected.
- * </ul>
- */
- private final WatermarkHold<W> watermarkHold;
-
- private final ReduceFnContextFactory<K, InputT, OutputT, W> contextFactory;
-
- /**
- * Store the previously emitted pane (if any) for each window.
- *
- * <ul>
- * <li>State: The previous {@link PaneInfo} passed to the user's {@link DoFn#processElement},
- * if any.
- * <li>Style style: DIRECT
- * <li>Merging: Always keyed by actual window, so does not depend on {@link #activeWindows}.
- * Cleared when window is merged away.
- * <li>Lifetime: Cleared when trigger is closed or window is garbage collected.
- * </ul>
- */
- private final PaneInfoTracker paneInfoTracker;
-
- /**
- * Store whether we've seen any elements for a window since the last pane was emitted.
- *
- * <ul>
- * <li>State: Unless DISCARDING_FIRED_PANES, a count of number of elements added so far.
- * <li>State style: RENAMED.
- * <li>Merging: Counts are summed when windows are merged.
- * <li>Lifetime: Cleared when pane fires or window is garbage collected.
- * </ul>
- */
- private final NonEmptyPanes<K, W> nonEmptyPanes;
-
- public ReduceFnRunner(
- K key,
- WindowingStrategy<?, W> windowingStrategy,
- StateInternals<K> stateInternals,
- TimerInternals timerInternals,
- WindowingInternals<?, KV<K, OutputT>> windowingInternals,
- Aggregator<Long, Long> droppedDueToClosedWindow,
- ReduceFn<K, InputT, OutputT, W> reduceFn,
- PipelineOptions options) {
- this.key = key;
- this.timerInternals = timerInternals;
- this.paneInfoTracker = new PaneInfoTracker(timerInternals);
- this.stateInternals = stateInternals;
- this.outputter = new OutputViaWindowingInternals<>(windowingInternals);
- this.droppedDueToClosedWindow = droppedDueToClosedWindow;
- this.reduceFn = reduceFn;
-
- @SuppressWarnings("unchecked")
- WindowingStrategy<Object, W> objectWindowingStrategy =
- (WindowingStrategy<Object, W>) windowingStrategy;
- this.windowingStrategy = objectWindowingStrategy;
-
- this.nonEmptyPanes = NonEmptyPanes.create(this.windowingStrategy, this.reduceFn);
-
- // Note this may incur I/O to load persisted window set data.
- this.activeWindows = createActiveWindowSet();
-
- this.contextFactory =
- new ReduceFnContextFactory<K, InputT, OutputT, W>(key, reduceFn, this.windowingStrategy,
- stateInternals, this.activeWindows, timerInternals, windowingInternals, options);
-
- this.watermarkHold = new WatermarkHold<>(timerInternals, windowingStrategy);
- this.triggerRunner =
- new TriggerRunner<>(
- windowingStrategy.getTrigger(),
- new TriggerContextFactory<>(windowingStrategy, stateInternals, activeWindows));
- }
-
- private ActiveWindowSet<W> createActiveWindowSet() {
- return windowingStrategy.getWindowFn().isNonMerging()
- ? new NonMergingActiveWindowSet<W>()
- : new MergingActiveWindowSet<W>(windowingStrategy.getWindowFn(), stateInternals);
- }
-
- @VisibleForTesting
- boolean isFinished(W window) {
- return triggerRunner.isClosed(contextFactory.base(window, StateStyle.DIRECT).state());
- }
-
- @VisibleForTesting
- boolean hasNoActiveWindows() {
- return activeWindows.getActiveAndNewWindows().isEmpty();
- }
-
- /**
- * Incorporate {@code values} into the underlying reduce function, and manage holds, timers,
- * triggers, and window merging.
- *
- * <p>The general strategy is:
- * <ol>
- * <li>Use {@link WindowedValue#getWindows} (itself determined using
- * {@link WindowFn#assignWindows}) to determine which windows each element belongs to. Some
- * of those windows will already have state associated with them. The rest are considered
- * NEW.
- * <li>Use {@link WindowFn#mergeWindows} to attempt to merge currently ACTIVE and NEW windows.
- * Each NEW window will become either ACTIVE or be discardedL.
- * (See {@link ActiveWindowSet} for definitions of these terms.)
- * <li>If at all possible, eagerly substitute NEW windows with their ACTIVE state address
- * windows before any state is associated with the NEW window. In the common case that
- * windows for new elements are merged into existing ACTIVE windows then no additional
- * storage or merging overhead will be incurred.
- * <li>Otherwise, keep track of the state address windows for ACTIVE windows so that their
- * states can be merged on-demand when a pane fires.
- * <li>Process the element for each of the windows it's windows have been merged into according
- * to {@link ActiveWindowSet}. Processing may require running triggers, setting timers,
- * setting holds, and invoking {@link ReduceFn#onTrigger}.
- * </ol>
- */
- public void processElements(Iterable<WindowedValue<InputT>> values) throws Exception {
- // If an incoming element introduces a new window, attempt to merge it into an existing
- // window eagerly.
- Map<W, W> windowToMergeResult = collectAndMergeWindows(values);
-
- Set<W> windowsToConsider = new HashSet<>();
-
- // Process each element, using the updated activeWindows determined by collectAndMergeWindows.
- for (WindowedValue<InputT> value : values) {
- windowsToConsider.addAll(processElement(windowToMergeResult, value));
- }
-
- // Trigger output from any window for which the trigger is ready
- for (W mergedWindow : windowsToConsider) {
- ReduceFn<K, InputT, OutputT, W>.Context directContext =
- contextFactory.base(mergedWindow, StateStyle.DIRECT);
- ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
- contextFactory.base(mergedWindow, StateStyle.RENAMED);
- triggerRunner.prefetchShouldFire(mergedWindow, directContext.state());
- emitIfAppropriate(directContext, renamedContext);
- }
-
- // We're all done with merging and emitting elements so can compress the activeWindow state.
- // Any windows which are still NEW must have come in on a new element which was then discarded
- // due to the window's trigger being closed. We can thus delete them.
- activeWindows.cleanupTemporaryWindows();
- }
-
- public void persist() {
- activeWindows.persist();
- }
-
- /**
- * Extract the windows associated with the values, and invoke merge. Return a map
- * from windows to the merge result window. If a window is not in the domain of
- * the result map then it did not get merged into a different window.
- */
- private Map<W, W> collectAndMergeWindows(Iterable<WindowedValue<InputT>> values)
- throws Exception {
- // No-op if no merging can take place
- if (windowingStrategy.getWindowFn().isNonMerging()) {
- return ImmutableMap.of();
- }
-
- // Collect the windows from all elements (except those which are too late) and
- // make sure they are already in the active window set or are added as NEW windows.
- for (WindowedValue<?> value : values) {
- for (BoundedWindow untypedWindow : value.getWindows()) {
- @SuppressWarnings("unchecked")
- W window = (W) untypedWindow;
-
- // For backwards compat with pre 1.4 only.
- // We may still have ACTIVE windows with multiple state addresses, representing
- // a window who's state has not yet been eagerly merged.
- // We'll go ahead and merge that state now so that we don't have to worry about
- // this legacy case anywhere else.
- if (activeWindows.isActive(window)) {
- Set<W> stateAddressWindows = activeWindows.readStateAddresses(window);
- if (stateAddressWindows.size() > 1) {
- // This is a legacy window who's state has not been eagerly merged.
- // Do that now.
- ReduceFn<K, InputT, OutputT, W>.OnMergeContext premergeContext =
- contextFactory.forPremerge(window);
- reduceFn.onMerge(premergeContext);
- watermarkHold.onMerge(premergeContext);
- activeWindows.merged(window);
- }
- }
-
- // Add this window as NEW if it is not currently ACTIVE.
- // If we had already seen this window and closed its trigger, then the
- // window will not be currently ACTIVE. It will then be added as NEW here,
- // and fall into the merging logic as usual.
- activeWindows.ensureWindowExists(window);
- }
- }
-
- // Merge all of the active windows and retain a mapping from source windows to result windows.
- Map<W, W> windowToMergeResult = new HashMap<>();
- activeWindows.merge(new OnMergeCallback(windowToMergeResult));
- return windowToMergeResult;
- }
-
- private class OnMergeCallback implements ActiveWindowSet.MergeCallback<W> {
- private final Map<W, W> windowToMergeResult;
-
- OnMergeCallback(Map<W, W> windowToMergeResult) {
- this.windowToMergeResult = windowToMergeResult;
- }
-
- /**
- * Return the subset of {@code windows} which are currently ACTIVE. We only need to worry
- * about merging state from ACTIVE windows. NEW windows by definition have no existing state.
- */
- private List<W> activeWindows(Iterable<W> windows) {
- List<W> active = new ArrayList<>();
- for (W window : windows) {
- if (activeWindows.isActive(window)) {
- active.add(window);
- }
- }
- return active;
- }
-
- /**
- * Called from the active window set to indicate {@code toBeMerged} (of which only
- * {@code activeToBeMerged} are ACTIVE and thus have state associated with them) will later
- * be merged into {@code mergeResult}.
- */
- @Override
- public void prefetchOnMerge(
- Collection<W> toBeMerged, W mergeResult) throws Exception {
- List<W> activeToBeMerged = activeWindows(toBeMerged);
- ReduceFn<K, InputT, OutputT, W>.OnMergeContext directMergeContext =
- contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.DIRECT);
- ReduceFn<K, InputT, OutputT, W>.OnMergeContext renamedMergeContext =
- contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.RENAMED);
-
- // Prefetch various state.
- triggerRunner.prefetchForMerge(mergeResult, activeToBeMerged, directMergeContext.state());
- reduceFn.prefetchOnMerge(renamedMergeContext.state());
- watermarkHold.prefetchOnMerge(renamedMergeContext.state());
- nonEmptyPanes.prefetchOnMerge(renamedMergeContext.state());
- }
-
- /**
- * Called from the active window set to indicate {@code toBeMerged} (of which only
- * {@code activeToBeMerged} are ACTIVE and thus have state associated with them) are about
- * to be merged into {@code mergeResult}.
- */
- @Override
- public void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {
- // Remember we have merged these windows.
- for (W window : toBeMerged) {
- windowToMergeResult.put(window, mergeResult);
- }
-
- // At this point activeWindows has NOT incorporated the results of the merge.
- List<W> activeToBeMerged = activeWindows(toBeMerged);
- ReduceFn<K, InputT, OutputT, W>.OnMergeContext directMergeContext =
- contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.DIRECT);
- ReduceFn<K, InputT, OutputT, W>.OnMergeContext renamedMergeContext =
- contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.RENAMED);
-
- // Run the reduceFn to perform any needed merging.
- reduceFn.onMerge(renamedMergeContext);
-
- // Merge the watermark holds.
- watermarkHold.onMerge(renamedMergeContext);
-
- // Merge non-empty pane state.
- nonEmptyPanes.onMerge(renamedMergeContext.state());
-
- // Have the trigger merge state as needed.
- triggerRunner.onMerge(
- directMergeContext.window(), directMergeContext.timers(), directMergeContext.state());
-
- for (W active : activeToBeMerged) {
- if (active.equals(mergeResult)) {
- // Not merged away.
- continue;
- }
- // Cleanup flavor A: Currently ACTIVE window is about to be merged away.
- // Clear any state not already cleared by the onMerge calls above.
- WindowTracing.debug("ReduceFnRunner.onMerge: Merging {} into {}", active, mergeResult);
- ReduceFn<K, InputT, OutputT, W>.Context directClearContext =
- contextFactory.base(active, StateStyle.DIRECT);
- // No need for the end-of-window or garbage collection timers.
- // We will establish a new end-of-window or garbage collection timer for the mergeResult
- // window in processElement below. There must be at least one element for the mergeResult
- // window since a new element with a new window must have triggered this onMerge.
- cancelEndOfWindowAndGarbageCollectionTimers(directClearContext);
- // We no longer care about any previous panes of merged away windows. The
- // merge result window gets to start fresh if it is new.
- paneInfoTracker.clear(directClearContext.state());
- }
- }
- }
-
- /**
- * Process an element.
- *
- * @param value the value being processed
- * @return the set of windows in which the element was actually processed
- */
- private Collection<W> processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT> value)
- throws Exception {
- // Redirect element windows to the ACTIVE windows they have been merged into.
- // The compressed representation (value, {window1, window2, ...}) actually represents
- // distinct elements (value, window1), (value, window2), ...
- // so if window1 and window2 merge, the resulting window will contain both copies
- // of the value.
- Collection<W> windows = new ArrayList<>();
- for (BoundedWindow untypedWindow : value.getWindows()) {
- @SuppressWarnings("unchecked")
- W window = (W) untypedWindow;
- W mergeResult = windowToMergeResult.get(window);
- if (mergeResult == null) {
- mergeResult = window;
- }
- windows.add(mergeResult);
- }
-
- // Prefetch in each of the windows if we're going to need to process triggers
- for (W window : windows) {
- ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue(
- window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
- triggerRunner.prefetchForValue(window, directContext.state());
- }
-
- // Process the element for each (mergeResultWindow, not closed) window it belongs to.
- List<W> triggerableWindows = new ArrayList<>(windows.size());
- for (W window : windows) {
- ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue(
- window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
- if (triggerRunner.isClosed(directContext.state())) {
- // This window has already been closed.
- droppedDueToClosedWindow.addValue(1L);
- WindowTracing.debug(
- "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
- + "since window is no longer active at inputWatermark:{}; outputWatermark:{}",
- value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
- timerInternals.currentOutputWatermarkTime());
- continue;
- }
-
- triggerableWindows.add(window);
- activeWindows.ensureWindowIsActive(window);
- ReduceFn<K, InputT, OutputT, W>.ProcessValueContext renamedContext = contextFactory.forValue(
- window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);
-
- nonEmptyPanes.recordContent(renamedContext.state());
-
- // Make sure we've scheduled the end-of-window or garbage collection timer for this window.
- Instant timer = scheduleEndOfWindowOrGarbageCollectionTimer(directContext);
-
- // Hold back progress of the output watermark until we have processed the pane this
- // element will be included within. If the element is too late for that, place a hold at
- // the end-of-window or garbage collection time to allow empty panes to contribute elements
- // which won't be dropped due to lateness by a following computation (assuming the following
- // computation uses the same allowed lateness value...)
- @Nullable Instant hold = watermarkHold.addHolds(renamedContext);
-
- if (hold != null) {
- // Assert that holds have a proximate timer.
- boolean holdInWindow = !hold.isAfter(window.maxTimestamp());
- boolean timerInWindow = !timer.isAfter(window.maxTimestamp());
- Preconditions.checkState(
- holdInWindow == timerInWindow,
- "set a hold at %s, a timer at %s, which disagree as to whether they are in window %s",
- hold,
- timer,
- directContext.window());
- }
-
- // Execute the reduceFn, which will buffer the value as appropriate
- reduceFn.processValue(renamedContext);
-
- // Run the trigger to update its state
- triggerRunner.processValue(
- directContext.window(),
- directContext.timestamp(),
- directContext.timers(),
- directContext.state());
-
- // At this point, if triggerRunner.shouldFire before the processValue then
- // triggerRunner.shouldFire after the processValue. In other words adding values
- // cannot take a trigger state from firing to non-firing.
- // (We don't actually assert this since it is too slow.)
- }
-
- return triggerableWindows;
- }
-
- /**
- * Called when an end-of-window, garbage collection, or trigger-specific timer fires.
- */
- public void onTimer(TimerData timer) throws Exception {
- // Which window is the timer for?
- Preconditions.checkArgument(timer.getNamespace() instanceof WindowNamespace,
- "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace());
- @SuppressWarnings("unchecked")
- WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
- W window = windowNamespace.getWindow();
- ReduceFn<K, InputT, OutputT, W>.Context directContext =
- contextFactory.base(window, StateStyle.DIRECT);
- ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
- contextFactory.base(window, StateStyle.RENAMED);
-
- // Has this window had its trigger finish?
- // - The trigger may implement isClosed as constant false.
- // - If the window function does not support windowing then all windows will be considered
- // active.
- // So we must take conjunction of activeWindows and triggerRunner state.
- boolean windowIsActiveAndOpen =
- activeWindows.isActive(window) && !triggerRunner.isClosed(directContext.state());
-
- if (!windowIsActiveAndOpen) {
- WindowTracing.debug(
- "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window);
- }
-
- // If this is an end-of-window timer then we may need to set a garbage collection timer
- // if allowed lateness is non-zero.
- boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
- && timer.getTimestamp().equals(window.maxTimestamp());
-
- // If this is a garbage collection timer then we should trigger and garbage collect the window.
- // We'll consider any timer at or after the end-of-window time to be a signal to garbage
- // collect.
- Instant cleanupTime = garbageCollectionTime(window);
- boolean isGarbageCollection = TimeDomain.EVENT_TIME == timer.getDomain()
- && !timer.getTimestamp().isBefore(cleanupTime);
-
- if (isGarbageCollection) {
- WindowTracing.debug(
- "ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with "
- + "inputWatermark:{}; outputWatermark:{}",
- key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(),
- timerInternals.currentOutputWatermarkTime());
-
- if (windowIsActiveAndOpen) {
- // We need to call onTrigger to emit the final pane if required.
- // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
- // and the watermark has passed the end of the window.
- @Nullable Instant newHold =
- onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow);
- Preconditions.checkState(newHold == null,
- "Hold placed at %s despite isFinished being true.", newHold);
- }
-
- // Cleanup flavor B: Clear all the remaining state for this window since we'll never
- // see elements for it again.
- clearAllState(directContext, renamedContext, windowIsActiveAndOpen);
- } else {
- WindowTracing.debug(
- "ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with "
- + "inputWatermark:{}; outputWatermark:{}",
- key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(),
- timerInternals.currentOutputWatermarkTime());
- if (windowIsActiveAndOpen) {
- emitIfAppropriate(directContext, renamedContext);
- }
-
- if (isEndOfWindow) {
- // If the window strategy trigger includes a watermark trigger then at this point
- // there should be no data holds, either because we'd already cleared them on an
- // earlier onTrigger, or because we just cleared them on the above emitIfAppropriate.
- // We could assert this but it is very expensive.
-
- // Since we are processing an on-time firing we should schedule the garbage collection
- // timer. (If getAllowedLateness is zero then the timer event will be considered a
- // cleanup event and handled by the above).
- // Note we must do this even if the trigger is finished so that we are sure to cleanup
- // any final trigger finished bits.
- Preconditions.checkState(
- windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
- "Unexpected zero getAllowedLateness");
- WindowTracing.debug(
- "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with "
- + "inputWatermark:{}; outputWatermark:{}",
- key, directContext.window(), cleanupTime, timerInternals.currentInputWatermarkTime(),
- timerInternals.currentOutputWatermarkTime());
- Preconditions.checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
- "Cleanup time %s is beyond end-of-time", cleanupTime);
- directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME);
- }
- }
- }
-
- /**
- * Clear all the state associated with {@code context}'s window.
- * Should only be invoked if we know all future elements for this window will be considered
- * beyond allowed lateness.
- * This is a superset of the clearing done by {@link #emitIfAppropriate} below since:
- * <ol>
- * <li>We can clear the trigger finished bits since we'll never need to ask if the trigger is
- * closed again.
- * <li>We can clear any remaining garbage collection hold.
- * </ol>
- */
- private void clearAllState(
- ReduceFn<K, InputT, OutputT, W>.Context directContext,
- ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
- boolean windowIsActiveAndOpen)
- throws Exception {
- if (windowIsActiveAndOpen) {
- // Since both the window is in the active window set AND the trigger was not yet closed,
- // it is possible we still have state.
- reduceFn.clearState(renamedContext);
- watermarkHold.clearHolds(renamedContext);
- nonEmptyPanes.clearPane(renamedContext.state());
- // These calls work irrespective of whether the window is active or not, but
- // are unnecessary if the window is not active.
- triggerRunner.clearState(
- directContext.window(), directContext.timers(), directContext.state());
- paneInfoTracker.clear(directContext.state());
- } else {
- // If !windowIsActiveAndOpen then !activeWindows.isActive (1) or triggerRunner.isClosed (2).
- // For (1), if !activeWindows.isActive then the window must be merging and has been
- // explicitly removed by emitIfAppropriate. But in that case the trigger must have fired
- // and been closed, so this case reduces to (2).
- // For (2), if triggerRunner.isClosed then the trigger was fired and entered the
- // closed state. In that case emitIfAppropriate will have cleared all state in
- // reduceFn, triggerRunner (except for finished bits), paneInfoTracker and activeWindows.
- // We also know nonEmptyPanes must have been unconditionally cleared by the trigger.
- // Since the trigger fired the existing watermark holds must have been cleared, and since
- // the trigger closed no new end of window or garbage collection hold will have been
- // placed by WatermarkHold.extractAndRelease.
- // Thus all the state clearing above is unnecessary.
- //
- // But(!) for backwards compatibility we must allow a pipeline to be updated from
- // an sdk version <= 1.3. In that case it is possible we have an end-of-window or
- // garbage collection hold keyed by the current window (reached via directContext) rather
- // than the state address window (reached via renamedContext).
- // However this can only happen if:
- // - We have merging windows.
- // - We are DISCARDING_FIRED_PANES.
- // - A pane has fired.
- // - But the trigger is not (yet) closed.
- if (windowingStrategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES
- && !windowingStrategy.getWindowFn().isNonMerging()) {
- watermarkHold.clearHolds(directContext);
- }
- }
-
- // Don't need to track address state windows anymore.
- activeWindows.remove(directContext.window());
- // We'll never need to test for the trigger being closed again.
- triggerRunner.clearFinished(directContext.state());
- }
-
- /** Should the reduce function state be cleared? */
- private boolean shouldDiscardAfterFiring(boolean isFinished) {
- if (isFinished) {
- // This is the last firing for trigger.
- return true;
- }
- if (windowingStrategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES) {
- // Nothing should be accumulated between panes.
- return true;
- }
- return false;
- }
-
- /**
- * Possibly emit a pane if a trigger is ready to fire or timers require it, and cleanup state.
- */
- private void emitIfAppropriate(ReduceFn<K, InputT, OutputT, W>.Context directContext,
- ReduceFn<K, InputT, OutputT, W>.Context renamedContext)
- throws Exception {
- if (!triggerRunner.shouldFire(
- directContext.window(), directContext.timers(), directContext.state())) {
- // Ignore unless trigger is ready to fire
- return;
- }
-
- // Inform the trigger of the transition to see if it is finished
- triggerRunner.onFire(directContext.window(), directContext.timers(), directContext.state());
- boolean isFinished = triggerRunner.isClosed(directContext.state());
-
- // Will be able to clear all element state after triggering?
- boolean shouldDiscard = shouldDiscardAfterFiring(isFinished);
-
- // Run onTrigger to produce the actual pane contents.
- // As a side effect it will clear all element holds, but not necessarily any
- // end-of-window or garbage collection holds.
- onTrigger(directContext, renamedContext, isFinished, false /*isEndOfWindow*/);
-
- // Now that we've triggered, the pane is empty.
- nonEmptyPanes.clearPane(renamedContext.state());
-
- // Cleanup buffered data if appropriate
- if (shouldDiscard) {
- // Cleanup flavor C: The user does not want any buffered data to persist between panes.
- reduceFn.clearState(renamedContext);
- }
-
- if (isFinished) {
- // Cleanup flavor D: If trigger is closed we will ignore all new incoming elements.
- // Clear state not otherwise cleared by onTrigger and clearPane above.
- // Remember the trigger is, indeed, closed until the window is garbage collected.
- triggerRunner.clearState(
- directContext.window(), directContext.timers(), directContext.state());
- paneInfoTracker.clear(directContext.state());
- activeWindows.remove(directContext.window());
- }
- }
-
- /**
- * Do we need to emit a pane?
- */
- private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing) {
- if (!isEmpty) {
- // The pane has elements.
- return true;
- }
- if (timing == Timing.ON_TIME) {
- // This is the unique ON_TIME pane.
- return true;
- }
- if (isFinished && windowingStrategy.getClosingBehavior() == ClosingBehavior.FIRE_ALWAYS) {
- // This is known to be the final pane, and the user has requested it even when empty.
- return true;
- }
- return false;
- }
-
- /**
- * Run the {@link ReduceFn#onTrigger} method and produce any necessary output.
- *
- * @return output watermark hold added, or {@literal null} if none.
- */
- @Nullable
- private Instant onTrigger(
- final ReduceFn<K, InputT, OutputT, W>.Context directContext,
- ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
- boolean isFinished, boolean isEndOfWindow)
- throws Exception {
- Instant inputWM = timerInternals.currentInputWatermarkTime();
-
- // Prefetch necessary states
- ReadableState<WatermarkHold.OldAndNewHolds> outputTimestampFuture =
- watermarkHold.extractAndRelease(renamedContext, isFinished).readLater();
- ReadableState<PaneInfo> paneFuture =
- paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater();
- ReadableState<Boolean> isEmptyFuture =
- nonEmptyPanes.isEmpty(renamedContext.state()).readLater();
-
- reduceFn.prefetchOnTrigger(directContext.state());
- triggerRunner.prefetchOnFire(directContext.window(), directContext.state());
-
- // Calculate the pane info.
- final PaneInfo pane = paneFuture.read();
- // Extract the window hold, and as a side effect clear it.
-
- WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read();
- final Instant outputTimestamp = pair.oldHold;
- @Nullable Instant newHold = pair.newHold;
-
- if (newHold != null) {
- // We can't be finished yet.
- Preconditions.checkState(
- !isFinished, "new hold at %s but finished %s", newHold, directContext.window());
- // The hold cannot be behind the input watermark.
- Preconditions.checkState(
- !newHold.isBefore(inputWM), "new hold %s is before input watermark %s", newHold, inputWM);
- if (newHold.isAfter(directContext.window().maxTimestamp())) {
- // The hold must be for garbage collection, which can't have happened yet.
- Preconditions.checkState(
- newHold.isEqual(garbageCollectionTime(directContext.window())),
- "new hold %s should be at garbage collection for window %s plus %s",
- newHold,
- directContext.window(),
- windowingStrategy.getAllowedLateness());
- } else {
- // The hold must be for the end-of-window, which can't have happened yet.
- Preconditions.checkState(
- newHold.isEqual(directContext.window().maxTimestamp()),
- "new hold %s should be at end of window %s",
- newHold,
- directContext.window());
- Preconditions.checkState(
- !isEndOfWindow,
- "new hold at %s for %s but this is the watermark trigger",
- newHold,
- directContext.window());
- }
- }
-
- // Only emit a pane if it has data or empty panes are observable.
- if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) {
- // Run reduceFn.onTrigger method.
- final List<W> windows = Collections.singletonList(directContext.window());
- ReduceFn<K, InputT, OutputT, W>.OnTriggerContext renamedTriggerContext =
- contextFactory.forTrigger(directContext.window(), paneFuture, StateStyle.RENAMED,
- new OnTriggerCallbacks<OutputT>() {
- @Override
- public void output(OutputT toOutput) {
- // We're going to output panes, so commit the (now used) PaneInfo.
- // TODO: This is unnecessary if the trigger isFinished since the saved
- // state will be immediately deleted.
- paneInfoTracker.storeCurrentPaneInfo(directContext, pane);
-
- // Output the actual value.
- outputter.outputWindowedValue(
- KV.of(key, toOutput), outputTimestamp, windows, pane);
- }
- });
-
- reduceFn.onTrigger(renamedTriggerContext);
- }
-
- return newHold;
- }
-
- /**
- * Make sure we'll eventually have a timer fire which will tell us to garbage collect
- * the window state. For efficiency we may need to do this in two steps rather
- * than one. Return the time at which the timer will fire.
- *
- * <ul>
- * <li>If allowedLateness is zero then we'll garbage collect at the end of the window.
- * For simplicity we'll set our own timer for this situation even though an
- * {@link AfterWatermark} trigger may have also set an end-of-window timer.
- * ({@code setTimer} is idempotent.)
- * <li>If allowedLateness is non-zero then we could just always set a timer for the garbage
- * collection time. However if the windows are large (eg hourly) and the allowedLateness is small
- * (eg seconds) then we'll end up with nearly twice the number of timers in-flight. So we
- * instead set an end-of-window timer and then roll that forward to a garbage collection timer
- * when it fires. We use the input watermark to distinguish those cases.
- * </ul>
- */
- private Instant scheduleEndOfWindowOrGarbageCollectionTimer(
- ReduceFn<?, ?, ?, W>.Context directContext) {
- Instant inputWM = timerInternals.currentInputWatermarkTime();
- Instant endOfWindow = directContext.window().maxTimestamp();
- String which;
- Instant timer;
- if (endOfWindow.isBefore(inputWM)) {
- timer = garbageCollectionTime(directContext.window());
- which = "garbage collection";
- } else {
- timer = endOfWindow;
- which = "end-of-window";
- }
- WindowTracing.trace(
- "ReduceFnRunner.scheduleEndOfWindowOrGarbageCollectionTimer: Scheduling {} timer at {} for "
- + "key:{}; window:{} where inputWatermark:{}; outputWatermark:{}",
- which,
- timer,
- key,
- directContext.window(),
- inputWM,
- timerInternals.currentOutputWatermarkTime());
- Preconditions.checkState(!timer.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
- "Timer %s is beyond end-of-time", timer);
- directContext.timers().setTimer(timer, TimeDomain.EVENT_TIME);
- return timer;
- }
-
- private void cancelEndOfWindowAndGarbageCollectionTimers(
- ReduceFn<?, ?, ?, W>.Context directContext) {
- WindowTracing.debug(
- "ReduceFnRunner.cancelEndOfWindowAndGarbageCollectionTimers: Deleting timers for "
- + "key:{}; window:{} where inputWatermark:{}; outputWatermark:{}",
- key, directContext.window(), timerInternals.currentInputWatermarkTime(),
- timerInternals.currentOutputWatermarkTime());
- Instant eow = directContext.window().maxTimestamp();
- directContext.timers().deleteTimer(eow, TimeDomain.EVENT_TIME);
- Instant gc = garbageCollectionTime(directContext.window());
- if (gc.isAfter(eow)) {
- directContext.timers().deleteTimer(eow, TimeDomain.EVENT_TIME);
- }
- }
-
- /**
- * Return when {@code window} should be garbage collected. If the window's expiration time is on
- * or after the end of the global window, it will be truncated to the end of the global window.
- */
- private Instant garbageCollectionTime(W window) {
-
- // If the end of the window + allowed lateness is beyond the "end of time" aka the end of the
- // global window, then we truncate it. The conditional is phrased like it is because the
- // addition of EOW + allowed lateness might even overflow the maximum allowed Instant
- if (GlobalWindow.INSTANCE
- .maxTimestamp()
- .minus(windowingStrategy.getAllowedLateness())
- .isBefore(window.maxTimestamp())) {
- return GlobalWindow.INSTANCE.maxTimestamp();
- } else {
- return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
- }
- }
-
- /**
- * An object that can output a value with all of its windowing information. This is a deliberately
- * restricted subinterface of {@link WindowingInternals} to express how it is used here.
- */
- private interface OutputWindowedValue<OutputT> {
- void outputWindowedValue(OutputT output, Instant timestamp,
- Collection<? extends BoundedWindow> windows, PaneInfo pane);
- }
-
- private static class OutputViaWindowingInternals<OutputT>
- implements OutputWindowedValue<OutputT> {
-
- private final WindowingInternals<?, OutputT> windowingInternals;
-
- public OutputViaWindowingInternals(WindowingInternals<?, OutputT> windowingInternals) {
- this.windowingInternals = windowingInternals;
- }
-
- @Override
- public void outputWindowedValue(
- OutputT output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- windowingInternals.outputWindowedValue(output, timestamp, windows, pane);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampsAndWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampsAndWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampsAndWindows.java
new file mode 100644
index 0000000..d129c8e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampsAndWindows.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Helper transform that makes timestamps and window assignments explicit in the value part of
+ * each key/value pair.
+ */
+public class ReifyTimestampsAndWindows<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, WindowedValue<V>>>> {
+
+ @Override
+ public PCollection<KV<K, WindowedValue<V>>> apply(PCollection<KV<K, V>> input) {
+
+ // The requirement to use a KvCoder *is* actually a model-level requirement, not specific
+ // to this implementation of GBK. All runners need a way to get the key.
+ checkArgument(
+ input.getCoder() instanceof KvCoder,
+ "%s requires its input to use a %s",
+ GroupByKey.class.getSimpleName(),
+ KvCoder.class.getSimpleName());
+
+ @SuppressWarnings("unchecked")
+ KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) input.getCoder();
+ Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+ Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
+ Coder<WindowedValue<V>> outputValueCoder =
+ FullWindowedValueCoder.of(
+ inputValueCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
+ Coder<KV<K, WindowedValue<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
+ return input
+ .apply(ParDo.of(new ReifyTimestampAndWindowsDoFn<K, V>()))
+ .setCoder(outputKvCoder);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
deleted file mode 100644
index 78377c8..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
-import org.apache.beam.sdk.util.ExecutionContext.StepContext;
-import org.apache.beam.sdk.util.common.CounterSet.AddCounterMutator;
-import org.apache.beam.sdk.values.TupleTag;
-
-import java.util.List;
-
-/**
- * Runs a {@link DoFn} by constructing the appropriate contexts and passing them in.
- *
- * @param <InputT> the type of the DoFn's (main) input elements
- * @param <OutputT> the type of the DoFn's (main) output elements
- */
-public class SimpleDoFnRunner<InputT, OutputT> extends DoFnRunnerBase<InputT, OutputT>{
-
- protected SimpleDoFnRunner(PipelineOptions options, DoFn<InputT, OutputT> fn,
- SideInputReader sideInputReader,
- OutputManager outputManager,
- TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, StepContext stepContext,
- AddCounterMutator addCounterMutator, WindowingStrategy<?, ?> windowingStrategy) {
- super(options, fn, sideInputReader, outputManager, mainOutputTag, sideOutputTags, stepContext,
- addCounterMutator, windowingStrategy);
- }
-
- @Override
- protected void invokeProcessElement(WindowedValue<InputT> elem) {
- final DoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem);
- // This can contain user code. Wrap it in case it throws an exception.
- try {
- fn.processElement(processContext);
- } catch (Exception ex) {
- throw wrapUserCodeException(ex);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemReduceFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemReduceFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemReduceFn.java
deleted file mode 100644
index 2eeee54..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemReduceFn.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateMerging;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-
-/**
- * {@link ReduceFn} implementing the default reduction behaviors of {@link GroupByKey}.
- *
- * @param <K> The type of key being processed.
- * @param <InputT> The type of values associated with the key.
- * @param <OutputT> The output type that will be produced for each key.
- * @param <W> The type of windows this operates on.
- */
-public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends BoundedWindow>
- extends ReduceFn<K, InputT, OutputT, W> {
- private static final String BUFFER_NAME = "buf";
-
- /**
- * Create a factory that produces {@link SystemReduceFn} instances that that buffer all of the
- * input values in persistent state and produces an {@code Iterable<T>}.
- */
- public static <K, T, W extends BoundedWindow> SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W>
- buffering(final Coder<T> inputCoder) {
- final StateTag<Object, BagState<T>> bufferTag =
- StateTags.makeSystemTagInternal(StateTags.bag(BUFFER_NAME, inputCoder));
- return new SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W>(bufferTag) {
- @Override
- public void prefetchOnMerge(MergingStateAccessor<K, W> state) throws Exception {
- StateMerging.prefetchBags(state, bufferTag);
- }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception {
- StateMerging.mergeBags(c.state(), bufferTag);
- }
- };
- }
-
- /**
- * Create a factory that produces {@link SystemReduceFn} instances that combine all of the input
- * values using a {@link CombineFn}.
- */
- public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> SystemReduceFn<K, InputT,
- AccumT, OutputT, W>
- combining(
- final Coder<K> keyCoder, final AppliedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
- final StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> bufferTag;
- if (combineFn.getFn() instanceof KeyedCombineFnWithContext) {
- bufferTag = StateTags.makeSystemTagInternal(
- StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValueWithContext(
- BUFFER_NAME, combineFn.getAccumulatorCoder(),
- (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) combineFn.getFn()));
-
- } else {
- bufferTag = StateTags.makeSystemTagInternal(
- StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValue(
- BUFFER_NAME, combineFn.getAccumulatorCoder(),
- (KeyedCombineFn<K, InputT, AccumT, OutputT>) combineFn.getFn()));
- }
- return new SystemReduceFn<K, InputT, AccumT, OutputT, W>(bufferTag) {
- @Override
- public void prefetchOnMerge(MergingStateAccessor<K, W> state) throws Exception {
- StateMerging.prefetchCombiningValues(state, bufferTag);
- }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception {
- StateMerging.mergeCombiningValues(c.state(), bufferTag);
- }
- };
- }
-
- private StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag;
-
- public SystemReduceFn(
- StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag) {
- this.bufferTag = bufferTag;
- }
-
- @Override
- public void processValue(ProcessValueContext c) throws Exception {
- c.state().access(bufferTag).add(c.value());
- }
-
- @Override
- public void prefetchOnTrigger(StateAccessor<K> state) {
- state.access(bufferTag).readLater();
- }
-
- @Override
- public void onTrigger(OnTriggerContext c) throws Exception {
- c.output(c.state().access(bufferTag).read());
- }
-
- @Override
- public void clearState(Context c) throws Exception {
- c.state().access(bufferTag).clear();
- }
-
- @Override
- public ReadableState<Boolean> isEmpty(StateAccessor<K> state) {
- return state.access(bufferTag).isEmpty();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerRunner.java
deleted file mode 100644
index f104f6a..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerRunner.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.util.state.ValueState;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-
-import org.joda.time.Instant;
-
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * Executes a trigger while managing persistence of information about which subtriggers are
- * finished. Subtriggers include all recursive trigger expressions as well as the entire trigger.
- *
- * <p>Specifically, the responsibilities are:
- *
- * <ul>
- * <li>Invoking the trigger's methods via its {@link ExecutableTrigger} wrapper by
- * constructing the appropriate trigger contexts.</li>
- * <li>Committing a record of which subtriggers are finished to persistent state.</li>
- * <li>Restoring the record of which subtriggers are finished from persistent state.</li>
- * <li>Clearing out the persisted finished set when a caller indicates
- * (via {#link #clearFinished}) that it is no longer needed.</li>
- * </ul>
- *
- * <p>These responsibilities are intertwined: trigger contexts include mutable information about
- * which subtriggers are finished. This class provides the information when building the contexts
- * and commits the information when the method of the {@link ExecutableTrigger} returns.
- *
- * @param <W> The kind of windows being processed.
- */
-public class TriggerRunner<W extends BoundedWindow> {
- @VisibleForTesting
- static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG =
- StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of()));
-
- private final ExecutableTrigger rootTrigger;
- private final TriggerContextFactory<W> contextFactory;
-
- public TriggerRunner(ExecutableTrigger rootTrigger, TriggerContextFactory<W> contextFactory) {
- Preconditions.checkState(rootTrigger.getTriggerIndex() == 0);
- this.rootTrigger = rootTrigger;
- this.contextFactory = contextFactory;
- }
-
- private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) {
- if (!isFinishedSetNeeded()) {
- // If no trigger in the tree will ever have finished bits, then we don't need to read them.
- // So that the code can be agnostic to that fact, we create a BitSet that is all 0 (not
- // finished) for each trigger in the tree.
- return FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree());
- }
-
- BitSet bitSet = state.read();
- return bitSet == null
- ? FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree())
- : FinishedTriggersBitSet.fromBitSet(bitSet);
- }
-
-
- private void clearFinishedBits(ValueState<BitSet> state) {
- if (!isFinishedSetNeeded()) {
- // Nothing to clear.
- return;
- }
- state.clear();
- }
-
- /** Return true if the trigger is closed in the window corresponding to the specified state. */
- public boolean isClosed(StateAccessor<?> state) {
- return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
- }
-
- public void prefetchForValue(W window, StateAccessor<?> state) {
- if (isFinishedSetNeeded()) {
- state.access(FINISHED_BITS_TAG).readLater();
- }
- rootTrigger.getSpec().prefetchOnElement(
- contextFactory.createStateAccessor(window, rootTrigger));
- }
-
- public void prefetchOnFire(W window, StateAccessor<?> state) {
- if (isFinishedSetNeeded()) {
- state.access(FINISHED_BITS_TAG).readLater();
- }
- rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger));
- }
-
- public void prefetchShouldFire(W window, StateAccessor<?> state) {
- if (isFinishedSetNeeded()) {
- state.access(FINISHED_BITS_TAG).readLater();
- }
- rootTrigger.getSpec().prefetchShouldFire(
- contextFactory.createStateAccessor(window, rootTrigger));
- }
-
- /**
- * Run the trigger logic to deal with a new value.
- */
- public void processValue(W window, Instant timestamp, Timers timers, StateAccessor<?> state)
- throws Exception {
- // Clone so that we can detect changes and so that changes here don't pollute merging.
- FinishedTriggersBitSet finishedSet =
- readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
- Trigger.OnElementContext triggerContext = contextFactory.createOnElementContext(
- window, timers, timestamp, rootTrigger, finishedSet);
- rootTrigger.invokeOnElement(triggerContext);
- persistFinishedSet(state, finishedSet);
- }
-
- public void prefetchForMerge(
- W window, Collection<W> mergingWindows, MergingStateAccessor<?, W> state) {
- if (isFinishedSetNeeded()) {
- for (ValueState<?> value : state.accessInEachMergingWindow(FINISHED_BITS_TAG).values()) {
- value.readLater();
- }
- }
- rootTrigger.getSpec().prefetchOnMerge(contextFactory.createMergingStateAccessor(
- window, mergingWindows, rootTrigger));
- }
-
- /**
- * Run the trigger merging logic as part of executing the specified merge.
- */
- public void onMerge(W window, Timers timers, MergingStateAccessor<?, W> state) throws Exception {
- // Clone so that we can detect changes and so that changes here don't pollute merging.
- FinishedTriggersBitSet finishedSet =
- readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-
- // And read the finished bits in each merging window.
- ImmutableMap.Builder<W, FinishedTriggers> builder = ImmutableMap.builder();
- for (Map.Entry<W, ValueState<BitSet>> entry :
- state.accessInEachMergingWindow(FINISHED_BITS_TAG).entrySet()) {
- // Don't need to clone these, since the trigger context doesn't allow modification
- builder.put(entry.getKey(), readFinishedBits(entry.getValue()));
- // Clear the underlying finished bits.
- clearFinishedBits(entry.getValue());
- }
- ImmutableMap<W, FinishedTriggers> mergingFinishedSets = builder.build();
-
- Trigger.OnMergeContext mergeContext = contextFactory.createOnMergeContext(
- window, timers, rootTrigger, finishedSet, mergingFinishedSets);
-
- // Run the merge from the trigger
- rootTrigger.invokeOnMerge(mergeContext);
-
- persistFinishedSet(state, finishedSet);
- }
-
- public boolean shouldFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
- FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
- Trigger.TriggerContext context = contextFactory.base(window, timers,
- rootTrigger, finishedSet);
- return rootTrigger.invokeShouldFire(context);
- }
-
- public void onFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
- // shouldFire should be false.
- // However it is too expensive to assert.
- FinishedTriggersBitSet finishedSet =
- readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
- Trigger.TriggerContext context = contextFactory.base(window, timers,
- rootTrigger, finishedSet);
- rootTrigger.invokeOnFire(context);
- persistFinishedSet(state, finishedSet);
- }
-
- private void persistFinishedSet(
- StateAccessor<?> state, FinishedTriggersBitSet modifiedFinishedSet) {
- if (!isFinishedSetNeeded()) {
- return;
- }
-
- ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG);
- if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
- if (modifiedFinishedSet.getBitSet().isEmpty()) {
- finishedSetState.clear();
- } else {
- finishedSetState.write(modifiedFinishedSet.getBitSet());
- }
- }
- }
-
- /**
- * Clear the finished bits.
- */
- public void clearFinished(StateAccessor<?> state) {
- clearFinishedBits(state.access(FINISHED_BITS_TAG));
- }
-
- /**
- * Clear the state used for executing triggers, but leave the finished set to indicate
- * the window is closed.
- */
- public void clearState(W window, Timers timers, StateAccessor<?> state) throws Exception {
- // Don't need to clone, because we'll be clearing the finished bits anyways.
- FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG));
- rootTrigger.invokeClear(contextFactory.base(window, timers, rootTrigger, finishedSet));
- }
-
- private boolean isFinishedSetNeeded() {
- // TODO: If we know that no trigger in the tree will ever finish, we don't need to do the
- // lookup. Right now, we special case this for the DefaultTrigger.
- return !(rootTrigger.getSpec() instanceof DefaultTrigger);
- }
-}