You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/30 22:23:02 UTC
[1/2] incubator-beam git commit: Improvements to ReduceFnRunner
prefetching
Repository: incubator-beam
Updated Branches:
refs/heads/master b75a76459 -> 4a7da91f0
Improvements to ReduceFnRunner prefetching
- add prefetch* methods for prefetching state matching existing methods
- replace onTimer with batched onTimers method to allow prefetching
across timers
- prefetch triggers in processElements
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4282c67c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4282c67c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4282c67c
Branch: refs/heads/master
Commit: 4282c67c5fa4dea2fe6c8695e0ea23f383c6457b
Parents: b75a764
Author: Sam Whittle <sa...@google.com>
Authored: Thu Nov 10 12:59:49 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Nov 30 13:32:05 2016 -0800
----------------------------------------------------------------------
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 11 +-
.../beam/runners/core/PaneInfoTracker.java | 4 +
.../runners/core/ReduceFnContextFactory.java | 9 +-
.../beam/runners/core/ReduceFnRunner.java | 488 ++++++++++++-------
.../apache/beam/runners/core/WatermarkHold.java | 5 +
.../triggers/TriggerStateMachineRunner.java | 14 +-
.../beam/runners/core/ReduceFnTester.java | 4 +-
.../GroupAlsoByWindowEvaluatorFactory.java | 5 +-
.../apache/beam/sdk/transforms/DoFnTester.java | 6 +-
.../sdk/util/state/InMemoryTimerInternals.java | 22 +-
.../beam/sdk/util/state/TimerCallback.java | 9 +-
.../util/state/InMemoryTimerInternalsTest.java | 54 +-
12 files changed, 407 insertions(+), 224 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4282c67c/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 8b10813..294f21d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateInternalsFactory;
@@ -73,9 +72,9 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
@Override
public void processElement(ProcessContext c) throws Exception {
- KeyedWorkItem<K, InputT> element = c.element();
+ KeyedWorkItem<K, InputT> keyedWorkItem = c.element();
- K key = c.element().key();
+ K key = keyedWorkItem.key();
TimerInternals timerInternals = c.windowingInternals().timerInternals();
StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
@@ -93,10 +92,8 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
reduceFn,
c.getPipelineOptions());
- reduceFnRunner.processElements(element.elementsIterable());
- for (TimerData timer : element.timersIterable()) {
- reduceFnRunner.onTimer(timer);
- }
+ reduceFnRunner.processElements(keyedWorkItem.elementsIterable());
+ reduceFnRunner.onTimers(keyedWorkItem.timersIterable());
reduceFnRunner.persist();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4282c67c/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
index 8140243..69a4cfd 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
@@ -54,6 +54,10 @@ public class PaneInfoTracker {
state.access(PANE_INFO_TAG).clear();
}
+ public void prefetchPaneInfo(ReduceFn<?, ?, ?, ?>.Context context) {
+ context.state().access(PaneInfoTracker.PANE_INFO_TAG).readLater();
+ }
+
/**
* Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane
* info includes the timing for the pane, who's calculation is quite subtle.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4282c67c/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
index 539126a..c5bda9b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -37,7 +37,6 @@ import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.Timers;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.State;
import org.apache.beam.sdk.util.state.StateAccessor;
import org.apache.beam.sdk.util.state.StateContext;
@@ -117,7 +116,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
}
public ReduceFn<K, InputT, OutputT, W>.OnTriggerContext forTrigger(W window,
- ReadableState<PaneInfo> pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
+ PaneInfo pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
return new OnTriggerContextImpl(stateAccessor(window, style), pane, callbacks);
}
@@ -389,11 +388,11 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
private class OnTriggerContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnTriggerContext {
private final StateAccessorImpl<K, W> state;
- private final ReadableState<PaneInfo> pane;
+ private final PaneInfo pane;
private final OnTriggerCallbacks<OutputT> callbacks;
private final TimersImpl timers;
- private OnTriggerContextImpl(StateAccessorImpl<K, W> state, ReadableState<PaneInfo> pane,
+ private OnTriggerContextImpl(StateAccessorImpl<K, W> state, PaneInfo pane,
OnTriggerCallbacks<OutputT> callbacks) {
reduceFn.super();
this.state = state;
@@ -424,7 +423,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
@Override
public PaneInfo paneInfo() {
- return pane.read();
+ return pane;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4282c67c/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index a686f46..3a82be9 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -21,12 +21,15 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -58,7 +61,6 @@ import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
import org.apache.beam.sdk.util.state.TimerCallback;
@@ -268,6 +270,32 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
return activeWindows.getActiveAndNewWindows().isEmpty();
}
+ private Set<W> openWindows(Collection<W> windows) {
+ Set<W> result = new HashSet<>();
+ for (W window : windows) {
+ ReduceFn<K, InputT, OutputT, W>.Context directContext = contextFactory.base(
+ window, StateStyle.DIRECT);
+ if (!triggerRunner.isClosed(directContext.state())) {
+ result.add(window);
+ }
+ }
+ return result;
+ }
+
+ private Collection<W> windowsThatShouldFire(Set<W> windows) throws Exception {
+ Collection<W> result = new ArrayList<>();
+ // Filter out timers that didn't trigger.
+ for (W window : windows) {
+ ReduceFn<K, InputT, OutputT, W>.Context directContext =
+ contextFactory.base(window, StateStyle.DIRECT);
+ if (triggerRunner.shouldFire(
+ directContext.window(), directContext.timers(), directContext.state())) {
+ result.add(window);
+ }
+ }
+ return result;
+ }
+
/**
* Incorporate {@code values} into the underlying reduce function, and manage holds, timers,
* triggers, and window merging.
@@ -293,25 +321,54 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
* </ol>
*/
public void processElements(Iterable<WindowedValue<InputT>> values) throws Exception {
+ if (!values.iterator().hasNext()) {
+ return;
+ }
+
+ // Determine all the windows for elements.
+ Set<W> windows = collectWindows(values);
// If an incoming element introduces a new window, attempt to merge it into an existing
// window eagerly.
- Map<W, W> windowToMergeResult = collectAndMergeWindows(values);
+ Map<W, W> windowToMergeResult = mergeWindows(windows);
+ if (!windowToMergeResult.isEmpty()) {
+ // Update windows by removing all windows that were merged away and adding
+ // the windows they were merged to. We add after completing all the
+ // removals to avoid removing a window that was also added.
+ List<W> addedWindows = new ArrayList<>(windowToMergeResult.size());
+ for (Map.Entry<W, W> entry : windowToMergeResult.entrySet()) {
+ windows.remove(entry.getKey());
+ addedWindows.add(entry.getValue());
+ }
+ windows.addAll(addedWindows);
+ }
- Set<W> windowsToConsider = new HashSet<>();
+ prefetchWindowsForValues(windows);
+
+ // All windows that are open before element processing may need to fire.
+ Set<W> windowsToConsider = openWindows(windows);
// Process each element, using the updated activeWindows determined by collectAndMergeWindows.
for (WindowedValue<InputT> value : values) {
- windowsToConsider.addAll(processElement(windowToMergeResult, value));
+ processElement(windowToMergeResult, value);
}
- // Trigger output from any window for which the trigger is ready
+ // Now that we've processed the elements, see if any of the windows need to fire.
+ // Prefetch state necessary to determine if the triggers should fire.
for (W mergedWindow : windowsToConsider) {
- ReduceFn<K, InputT, OutputT, W>.Context directContext =
- contextFactory.base(mergedWindow, StateStyle.DIRECT);
- ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
- contextFactory.base(mergedWindow, StateStyle.RENAMED);
- triggerRunner.prefetchShouldFire(mergedWindow, directContext.state());
- emitIfAppropriate(directContext, renamedContext);
+ triggerRunner.prefetchShouldFire(
+ mergedWindow, contextFactory.base(mergedWindow, StateStyle.DIRECT).state());
+ }
+ // Filter to windows that are firing.
+ Collection<W> windowsToFire = windowsThatShouldFire(windowsToConsider);
+ // Prefetch windows that are firing.
+ for (W window : windowsToFire) {
+ prefetchEmit(contextFactory.base(window, StateStyle.DIRECT),
+ contextFactory.base(window, StateStyle.RENAMED));
+ }
+ // Trigger output from firing windows.
+ for (W window : windowsToFire) {
+ emit(contextFactory.base(window, StateStyle.DIRECT),
+ contextFactory.base(window, StateStyle.RENAMED));
}
// We're all done with merging and emitting elements so can compress the activeWindow state.
@@ -325,52 +382,61 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
}
/**
- * Extract the windows associated with the values, and invoke merge. Return a map
- * from windows to the merge result window. If a window is not in the domain of
- * the result map then it did not get merged into a different window.
+ * Extract the windows associated with the values.
*/
- private Map<W, W> collectAndMergeWindows(Iterable<WindowedValue<InputT>> values)
- throws Exception {
- // No-op if no merging can take place
+ private Set<W> collectWindows(Iterable<WindowedValue<InputT>> values) throws Exception {
+ Set<W> windows = new HashSet<>();
+ for (WindowedValue<?> value : values) {
+ for (BoundedWindow untypedWindow : value.getWindows()) {
+ @SuppressWarnings("unchecked")
+ W window = (W) untypedWindow;
+ windows.add(window);
+ }
+ }
+ return windows;
+ }
+
+ /**
+ * Invoke merge for the given windows and return a map from windows to the
+ * merge result window. Windows that were not merged are not present in the
+ * map.
+ */
+ private Map<W, W> mergeWindows(Set<W> windows) throws Exception {
if (windowingStrategy.getWindowFn().isNonMerging()) {
- return ImmutableMap.of();
+ // Return an empty map, indicating that every window is not merged.
+ return Collections.emptyMap();
}
+ Map<W, W> windowToMergeResult = new HashMap<>();
// Collect the windows from all elements (except those which are too late) and
// make sure they are already in the active window set or are added as NEW windows.
- for (WindowedValue<?> value : values) {
- for (BoundedWindow untypedWindow : value.getWindows()) {
- @SuppressWarnings("unchecked")
- W window = (W) untypedWindow;
-
- // For backwards compat with pre 1.4 only.
- // We may still have ACTIVE windows with multiple state addresses, representing
- // a window who's state has not yet been eagerly merged.
- // We'll go ahead and merge that state now so that we don't have to worry about
- // this legacy case anywhere else.
- if (activeWindows.isActive(window)) {
- Set<W> stateAddressWindows = activeWindows.readStateAddresses(window);
- if (stateAddressWindows.size() > 1) {
- // This is a legacy window who's state has not been eagerly merged.
- // Do that now.
- ReduceFn<K, InputT, OutputT, W>.OnMergeContext premergeContext =
- contextFactory.forPremerge(window);
- reduceFn.onMerge(premergeContext);
- watermarkHold.onMerge(premergeContext);
- activeWindows.merged(window);
- }
+ for (W window : windows) {
+ // For backwards compat with pre 1.4 only.
+ // We may still have ACTIVE windows with multiple state addresses, representing
+ // a window who's state has not yet been eagerly merged.
+ // We'll go ahead and merge that state now so that we don't have to worry about
+ // this legacy case anywhere else.
+ if (activeWindows.isActive(window)) {
+ Set<W> stateAddressWindows = activeWindows.readStateAddresses(window);
+ if (stateAddressWindows.size() > 1) {
+ // This is a legacy window who's state has not been eagerly merged.
+ // Do that now.
+ ReduceFn<K, InputT, OutputT, W>.OnMergeContext premergeContext =
+ contextFactory.forPremerge(window);
+ reduceFn.onMerge(premergeContext);
+ watermarkHold.onMerge(premergeContext);
+ activeWindows.merged(window);
}
-
- // Add this window as NEW if it is not currently ACTIVE.
- // If we had already seen this window and closed its trigger, then the
- // window will not be currently ACTIVE. It will then be added as NEW here,
- // and fall into the merging logic as usual.
- activeWindows.ensureWindowExists(window);
}
+
+ // Add this window as NEW if it is not currently ACTIVE.
+ // If we had already seen this window and closed its trigger, then the
+ // window will not be currently ACTIVE. It will then be added as NEW here,
+ // and fall into the merging logic as usual.
+ activeWindows.ensureWindowExists(window);
}
// Merge all of the active windows and retain a mapping from source windows to result windows.
- Map<W, W> windowToMergeResult = new HashMap<>();
activeWindows.merge(new OnMergeCallback(windowToMergeResult));
return windowToMergeResult;
}
@@ -472,38 +538,50 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
}
/**
- * Process an element.
- *
- * @param value the value being processed
- * @return the set of windows in which the element was actually processed
+ * Redirect element windows to the ACTIVE windows they have been merged into.
+ * The compressed representation (value, {window1, window2, ...}) actually represents
+ * distinct elements (value, window1), (value, window2), ...
+ * so if window1 and window2 merge, the resulting window will contain both copies
+ * of the value.
*/
- private Collection<W> processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT> value)
- throws Exception {
- // Redirect element windows to the ACTIVE windows they have been merged into.
- // The compressed representation (value, {window1, window2, ...}) actually represents
- // distinct elements (value, window1), (value, window2), ...
- // so if window1 and window2 merge, the resulting window will contain both copies
- // of the value.
- Collection<W> windows = new ArrayList<>();
- for (BoundedWindow untypedWindow : value.getWindows()) {
- @SuppressWarnings("unchecked")
- W window = (W) untypedWindow;
- W mergeResult = windowToMergeResult.get(window);
- if (mergeResult == null) {
- mergeResult = window;
- }
- windows.add(mergeResult);
- }
+ private ImmutableSet<W> toMergedWindows(final Map<W, W> windowToMergeResult,
+ final Collection<? extends BoundedWindow> windows) {
+ return ImmutableSet.copyOf(
+ FluentIterable.from(windows).transform(
+ new Function<BoundedWindow, W>() {
+ @Override
+ public W apply(BoundedWindow untypedWindow) {
+ @SuppressWarnings("unchecked")
+ W window = (W) untypedWindow;
+ W mergedWindow = windowToMergeResult.get(window);
+ // If the element is not present in the map, the window is unmerged.
+ return (mergedWindow == null) ? window : mergedWindow;
+ }
+ }
+ ));
+ }
+ private void prefetchWindowsForValues(Collection<W> windows) {
// Prefetch in each of the windows if we're going to need to process triggers
for (W window : windows) {
- ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue(
- window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
+ ReduceFn<K, InputT, OutputT, W>.Context directContext = contextFactory.base(
+ window, StateStyle.DIRECT);
triggerRunner.prefetchForValue(window, directContext.state());
}
+ }
+
+ /**
+ * Process an element.
+ *
+ * @param windowToMergeResult map of windows to merged windows. If a window is
+ * not present it is unmerged.
+ * @param value the value being processed
+ */
+ private void processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT> value)
+ throws Exception {
+ ImmutableSet<W> windows = toMergedWindows(windowToMergeResult, value.getWindows());
// Process the element for each (mergeResultWindow, not closed) window it belongs to.
- List<W> triggerableWindows = new ArrayList<>(windows.size());
for (W window : windows) {
ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue(
window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
@@ -518,7 +596,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
continue;
}
- triggerableWindows.add(window);
activeWindows.ensureWindowIsActive(window);
ReduceFn<K, InputT, OutputT, W>.ProcessValueContext renamedContext = contextFactory.forValue(
window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);
@@ -562,102 +639,152 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
// cannot take a trigger state from firing to non-firing.
// (We don't actually assert this since it is too slow.)
}
-
- return triggerableWindows;
}
/**
- * Called when an end-of-window, garbage collection, or trigger-specific timer fires.
+ * Enriches TimerData with state necessary for processing a timer as well as
+ * common queries about a timer.
*/
- public void onTimer(TimerData timer) throws Exception {
- // Which window is the timer for?
- checkArgument(timer.getNamespace() instanceof WindowNamespace,
- "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace());
- @SuppressWarnings("unchecked")
- WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
- W window = windowNamespace.getWindow();
- ReduceFn<K, InputT, OutputT, W>.Context directContext =
- contextFactory.base(window, StateStyle.DIRECT);
- ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
- contextFactory.base(window, StateStyle.RENAMED);
+ private class EnrichedTimerData {
+ public final Instant timestamp;
+ public final ReduceFn<K, InputT, OutputT, W>.Context directContext;
+ public final ReduceFn<K, InputT, OutputT, W>.Context renamedContext;
+ // If this is an end-of-window timer then we may need to set a garbage collection timer
+ // if allowed lateness is non-zero.
+ public final boolean isEndOfWindow;
+ // If this is a garbage collection timer then we should trigger and
+ // garbage collect the window. We'll consider any timer at or after the
+ // end-of-window time to be a signal to garbage collect.
+ public final boolean isGarbageCollection;
+
+ EnrichedTimerData(
+ TimerData timer,
+ ReduceFn<K, InputT, OutputT, W>.Context directContext,
+ ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
+ this.timestamp = timer.getTimestamp();
+ this.directContext = directContext;
+ this.renamedContext = renamedContext;
+ W window = directContext.window();
+ this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
+ && timer.getTimestamp().equals(window.maxTimestamp());
+ Instant cleanupTime = garbageCollectionTime(window);
+ this.isGarbageCollection = !timer.getTimestamp().isBefore(cleanupTime);
+ }
// Has this window had its trigger finish?
// - The trigger may implement isClosed as constant false.
// - If the window function does not support windowing then all windows will be considered
// active.
// So we must take conjunction of activeWindows and triggerRunner state.
- boolean windowIsActiveAndOpen =
- activeWindows.isActive(window) && !triggerRunner.isClosed(directContext.state());
+ public boolean windowIsActiveAndOpen() {
+ return activeWindows.isActive(directContext.window())
+ && !triggerRunner.isClosed(directContext.state());
+ }
+ }
- if (!windowIsActiveAndOpen) {
- WindowTracing.debug(
- "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window);
+ public void onTimers(Iterable<TimerData> timers) throws Exception {
+ if (!timers.iterator().hasNext()) {
+ return;
}
- // If this is an end-of-window timer then we may need to set a garbage collection timer
- // if allowed lateness is non-zero.
- boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
- && timer.getTimestamp().equals(window.maxTimestamp());
-
- // If this is a garbage collection timer then we should trigger and garbage collect the window.
- // We'll consider any timer at or after the end-of-window time to be a signal to garbage
- // collect.
- Instant cleanupTime = garbageCollectionTime(window);
- boolean isGarbageCollection = TimeDomain.EVENT_TIME == timer.getDomain()
- && !timer.getTimestamp().isBefore(cleanupTime);
-
- if (isGarbageCollection) {
- WindowTracing.debug(
- "ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with "
- + "inputWatermark:{}; outputWatermark:{}",
- key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(),
- timerInternals.currentOutputWatermarkTime());
-
- if (windowIsActiveAndOpen) {
- // We need to call onTrigger to emit the final pane if required.
- // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
- // and the watermark has passed the end of the window.
- @Nullable Instant newHold =
- onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow);
- checkState(newHold == null,
- "Hold placed at %s despite isFinished being true.", newHold);
+ // Create a reusable context for each timer and begin prefetching necessary
+ // state.
+ List<EnrichedTimerData> enrichedTimers = new LinkedList();
+ for (TimerData timer : timers) {
+ checkArgument(timer.getNamespace() instanceof WindowNamespace,
+ "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace());
+ @SuppressWarnings("unchecked")
+ WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
+ W window = windowNamespace.getWindow();
+ ReduceFn<K, InputT, OutputT, W>.Context directContext =
+ contextFactory.base(window, StateStyle.DIRECT);
+ ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
+ contextFactory.base(window, StateStyle.RENAMED);
+ EnrichedTimerData enrichedTimer = new EnrichedTimerData(timer, directContext, renamedContext);
+ enrichedTimers.add(enrichedTimer);
+
+ // Perform prefetching of state to determine if the trigger should fire.
+ if (enrichedTimer.isGarbageCollection) {
+ triggerRunner.prefetchIsClosed(directContext.state());
+ } else {
+ triggerRunner.prefetchShouldFire(directContext.window(), directContext.state());
}
+ }
- // Cleanup flavor B: Clear all the remaining state for this window since we'll never
- // see elements for it again.
- clearAllState(directContext, renamedContext, windowIsActiveAndOpen);
- } else {
- WindowTracing.debug(
- "ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with "
- + "inputWatermark:{}; outputWatermark:{}",
- key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(),
- timerInternals.currentOutputWatermarkTime());
- if (windowIsActiveAndOpen) {
- emitIfAppropriate(directContext, renamedContext);
+ // For those windows that are active and open, prefetch the triggering or emitting state.
+ for (EnrichedTimerData timer : enrichedTimers) {
+ if (timer.windowIsActiveAndOpen()) {
+ ReduceFn<K, InputT, OutputT, W>.Context directContext = timer.directContext;
+ if (timer.isGarbageCollection) {
+ prefetchOnTrigger(directContext, timer.renamedContext);
+ } else if (triggerRunner.shouldFire(
+ directContext.window(), directContext.timers(), directContext.state())) {
+ prefetchEmit(directContext, timer.renamedContext);
+ }
}
+ }
- if (isEndOfWindow) {
- // If the window strategy trigger includes a watermark trigger then at this point
- // there should be no data holds, either because we'd already cleared them on an
- // earlier onTrigger, or because we just cleared them on the above emitIfAppropriate.
- // We could assert this but it is very expensive.
-
- // Since we are processing an on-time firing we should schedule the garbage collection
- // timer. (If getAllowedLateness is zero then the timer event will be considered a
- // cleanup event and handled by the above).
- // Note we must do this even if the trigger is finished so that we are sure to cleanup
- // any final trigger finished bits.
- checkState(
- windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
- "Unexpected zero getAllowedLateness");
- WindowTracing.debug(
- "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with "
- + "inputWatermark:{}; outputWatermark:{}",
- key, directContext.window(), cleanupTime, timerInternals.currentInputWatermarkTime(),
+ // Perform processing now that everything is prefetched.
+ for (EnrichedTimerData timer : enrichedTimers) {
+ ReduceFn<K, InputT, OutputT, W>.Context directContext = timer.directContext;
+ ReduceFn<K, InputT, OutputT, W>.Context renamedContext = timer.renamedContext;
+
+ if (timer.isGarbageCollection) {
+ WindowTracing.debug("ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with "
+ + "inputWatermark:{}; outputWatermark:{}",
+ key, directContext.window(), timer.timestamp,
+ timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
- checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
- "Cleanup time %s is beyond end-of-time", cleanupTime);
- directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME);
+
+ boolean windowIsActiveAndOpen = timer.windowIsActiveAndOpen();
+ if (windowIsActiveAndOpen) {
+ // We need to call onTrigger to emit the final pane if required.
+ // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
+ // and the watermark has passed the end of the window.
+ @Nullable
+ Instant newHold = onTrigger(
+ directContext, renamedContext, true /* isFinished */, timer.isEndOfWindow);
+ checkState(newHold == null, "Hold placed at %s despite isFinished being true.", newHold);
+ }
+
+ // Cleanup flavor B: Clear all the remaining state for this window since we'll never
+ // see elements for it again.
+ clearAllState(directContext, renamedContext, windowIsActiveAndOpen);
+ } else {
+ WindowTracing.debug("ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with "
+ + "inputWatermark:{}; outputWatermark:{}",
+ key, directContext.window(), timer.timestamp,
+ timerInternals.currentInputWatermarkTime(),
+ timerInternals.currentOutputWatermarkTime());
+ if (timer.windowIsActiveAndOpen()
+ && triggerRunner.shouldFire(
+ directContext.window(), directContext.timers(), directContext.state())) {
+ emit(directContext, renamedContext);
+ }
+
+ if (timer.isEndOfWindow) {
+ // If the window strategy trigger includes a watermark trigger then at this point
+ // there should be no data holds, either because we'd already cleared them on an
+ // earlier onTrigger, or because we just cleared them on the above emit.
+ // We could assert this but it is very expensive.
+
+ // Since we are processing an on-time firing we should schedule the garbage collection
+ // timer. (If getAllowedLateness is zero then the timer event will be considered a
+ // cleanup event and handled by the above).
+ // Note we must do this even if the trigger is finished so that we are sure to cleanup
+ // any final trigger finished bits.
+ checkState(windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
+ "Unexpected zero getAllowedLateness");
+ Instant cleanupTime = garbageCollectionTime(directContext.window());
+ WindowTracing.debug(
+ "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with "
+ + "inputWatermark:{}; outputWatermark:{}",
+ key, directContext.window(), cleanupTime, timerInternals.currentInputWatermarkTime(),
+ timerInternals.currentOutputWatermarkTime());
+ checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+ "Cleanup time %s is beyond end-of-time", cleanupTime);
+ directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME);
+ }
}
}
}
@@ -666,7 +793,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
* Clear all the state associated with {@code context}'s window.
* Should only be invoked if we know all future elements for this window will be considered
* beyond allowed lateness.
- * This is a superset of the clearing done by {@link #emitIfAppropriate} below since:
+ * This is a superset of the clearing done by {@link #emitPane} below since:
* <ol>
* <li>We can clear the trigger finished bits since we'll never need to ask if the trigger is
* closed again.
@@ -692,10 +819,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
} else {
// If !windowIsActiveAndOpen then !activeWindows.isActive (1) or triggerRunner.isClosed (2).
// For (1), if !activeWindows.isActive then the window must be merging and has been
- // explicitly removed by emitIfAppropriate. But in that case the trigger must have fired
+ // explicitly removed by emit. But in that case the trigger must have fired
// and been closed, so this case reduces to (2).
// For (2), if triggerRunner.isClosed then the trigger was fired and entered the
- // closed state. In that case emitIfAppropriate will have cleared all state in
+ // closed state. In that case emit will have cleared all state in
// reduceFn, triggerRunner (except for finished bits), paneInfoTracker and activeWindows.
// We also know nonEmptyPanes must have been unconditionally cleared by the trigger.
// Since the trigger fired the existing watermark holds must have been cleared, and since
@@ -737,17 +864,23 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
return false;
}
+ private void prefetchEmit(ReduceFn<K, InputT, OutputT, W>.Context directContext,
+ ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
+ triggerRunner.prefetchShouldFire(directContext.window(), directContext.state());
+ triggerRunner.prefetchOnFire(directContext.window(), directContext.state());
+ triggerRunner.prefetchIsClosed(directContext.state());
+ prefetchOnTrigger(directContext, renamedContext);
+ }
+
/**
- * Possibly emit a pane if a trigger is ready to fire or timers require it, and cleanup state.
+ * Emit if a trigger is ready to fire or timers require it, and cleanup state.
*/
- private void emitIfAppropriate(ReduceFn<K, InputT, OutputT, W>.Context directContext,
+ private void emit(
+ ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext)
throws Exception {
- if (!triggerRunner.shouldFire(
- directContext.window(), directContext.timers(), directContext.state())) {
- // Ignore unless trigger is ready to fire
- return;
- }
+ checkState(triggerRunner.shouldFire(
+ directContext.window(), directContext.timers(), directContext.state()));
// Inform the trigger of the transition to see if it is finished
triggerRunner.onFire(directContext.window(), directContext.timers(), directContext.state());
@@ -782,7 +915,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
}
/**
- * Do we need to emit a pane?
+ * Do we need to emit?
*/
private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing) {
if (!isEmpty) {
@@ -800,6 +933,15 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
return false;
}
+ private void prefetchOnTrigger(
+ final ReduceFn<K, InputT, OutputT, W>.Context directContext,
+ ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
+ paneInfoTracker.prefetchPaneInfo(directContext);
+ watermarkHold.prefetchExtract(renamedContext);
+ nonEmptyPanes.isEmpty(renamedContext.state()).readLater();
+ reduceFn.prefetchOnTrigger(directContext.state());
+ }
+
/**
* Run the {@link ReduceFn#onTrigger} method and produce any necessary output.
*
@@ -813,25 +955,17 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
throws Exception {
Instant inputWM = timerInternals.currentInputWatermarkTime();
- // Prefetch necessary states
- ReadableState<WatermarkHold.OldAndNewHolds> outputTimestampFuture =
- watermarkHold.extractAndRelease(renamedContext, isFinished).readLater();
- ReadableState<PaneInfo> paneFuture =
- paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater();
- ReadableState<Boolean> isEmptyFuture =
- nonEmptyPanes.isEmpty(renamedContext.state()).readLater();
-
- reduceFn.prefetchOnTrigger(directContext.state());
- triggerRunner.prefetchOnFire(directContext.window(), directContext.state());
-
// Calculate the pane info.
- final PaneInfo pane = paneFuture.read();
- // Extract the window hold, and as a side effect clear it.
+ final PaneInfo pane = paneInfoTracker.getNextPaneInfo(directContext, isFinished).read();
- WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read();
+ // Extract the window hold, and as a side effect clear it.
+ final WatermarkHold.OldAndNewHolds pair =
+ watermarkHold.extractAndRelease(renamedContext, isFinished).read();
final Instant outputTimestamp = pair.oldHold;
@Nullable Instant newHold = pair.newHold;
+ final boolean isEmpty = nonEmptyPanes.isEmpty(renamedContext.state()).read();
+
if (newHold != null) {
// We can't be finished yet.
checkState(
@@ -863,11 +997,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
}
// Only emit a pane if it has data or empty panes are observable.
- if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) {
+ if (needToEmit(isEmpty, isFinished, pane.getTiming())) {
// Run reduceFn.onTrigger method.
final List<W> windows = Collections.singletonList(directContext.window());
ReduceFn<K, InputT, OutputT, W>.OnTriggerContext renamedTriggerContext =
- contextFactory.forTrigger(directContext.window(), paneFuture, StateStyle.RENAMED,
+ contextFactory.forTrigger(directContext.window(), pane, StateStyle.RENAMED,
new OnTriggerCallbacks<OutputT>() {
@Override
public void output(OutputT toOutput) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4282c67c/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 3c04571..7f1afcc 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -444,6 +444,11 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
}
}
+ public void prefetchExtract(final ReduceFn<?, ?, ?, W>.Context context) {
+ context.state().access(elementHoldTag).readLater();
+ context.state().access(EXTRA_HOLD_TAG).readLater();
+ }
+
/**
* Return (a future for) the earliest hold for {@code context}. Clear all the holds after
* reading, but add/restore an end-of-window or garbage collection hold if required.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4282c67c/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
index 9f03216..2f277eb 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
@@ -99,25 +99,25 @@ public class TriggerStateMachineRunner<W extends BoundedWindow> {
return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
}
- public void prefetchForValue(W window, StateAccessor<?> state) {
+ public void prefetchIsClosed(StateAccessor<?> state) {
if (isFinishedSetNeeded()) {
state.access(FINISHED_BITS_TAG).readLater();
}
+ }
+
+ public void prefetchForValue(W window, StateAccessor<?> state) {
+ prefetchIsClosed(state);
rootTrigger.getSpec().prefetchOnElement(
contextFactory.createStateAccessor(window, rootTrigger));
}
public void prefetchOnFire(W window, StateAccessor<?> state) {
- if (isFinishedSetNeeded()) {
- state.access(FINISHED_BITS_TAG).readLater();
- }
+ prefetchIsClosed(state);
rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger));
}
public void prefetchShouldFire(W window, StateAccessor<?> state) {
- if (isFinishedSetNeeded()) {
- state.access(FINISHED_BITS_TAG).readLater();
- }
+ prefetchIsClosed(state);
rootTrigger.getSpec().prefetchShouldFire(
contextFactory.createStateAccessor(window, rootTrigger));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4282c67c/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 337be23..8be8ae5 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -509,8 +509,10 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exception {
ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
- runner.onTimer(
+ ArrayList timers = new ArrayList(1);
+ timers.add(
TimerData.of(StateNamespaces.window(windowFn.windowCoder(), window), timestamp, domain));
+ runner.onTimers(timers);
runner.persist();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4282c67c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 9d25bc6..f70fb94 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -47,7 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -201,9 +200,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
// Drop any elements within expired windows
reduceFnRunner.processElements(
dropExpiredWindows(key, workItem.elementsIterable(), timerInternals));
- for (TimerData timer : workItem.timersIterable()) {
- reduceFnRunner.onTimer(timer);
- }
+ reduceFnRunner.onTimers(workItem.timersIterable());
reduceFnRunner.persist();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4282c67c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index daa8a06..f8b1222 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -466,8 +466,10 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
private static TimerCallback collectInto(final List<TimerInternals.TimerData> firedTimers) {
return new TimerCallback() {
@Override
- public void onTimer(TimerInternals.TimerData timer) throws Exception {
- firedTimers.add(timer);
+ public void onTimers(Iterable<TimerInternals.TimerData> timers) throws Exception {
+ for (TimerInternals.TimerData timer : timers) {
+ firedTimers.add(timer);
+ }
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4282c67c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
index a3bb45a..f1ddaac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.MoreObjects;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.PriorityQueue;
import java.util.Set;
@@ -235,13 +236,20 @@ public class InMemoryTimerInternals implements TimerInternals {
throws Exception {
checkNotNull(timerCallback);
PriorityQueue<TimerData> queue = queue(domain);
- while (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) {
- // Remove before firing, so that if the callback adds another identical
- // timer we don't remove it.
- TimerData timer = queue.remove();
- WindowTracing.trace(
- "InMemoryTimerInternals.advanceAndFire: firing {} at {}", timer, currentTime);
- timerCallback.onTimer(timer);
+ while (true) {
+ ArrayList<TimerData> firedTimers = new ArrayList();
+ while (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) {
+ // Remove before firing, so that if the callback adds another identical
+ // timer we don't remove it.
+ TimerData timer = queue.remove();
+ firedTimers.add(timer);
+ WindowTracing.trace(
+ "InMemoryTimerInternals.advanceAndFire: firing {} at {}", timer, currentTime);
+ }
+ if (firedTimers.isEmpty()) {
+ break;
+ }
+ timerCallback.onTimers(firedTimers);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4282c67c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java
index 6598e30..dfdfd5b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java
@@ -19,16 +19,17 @@ package org.apache.beam.sdk.util.state;
import org.apache.beam.sdk.util.TimerInternals;
+
/**
- * A callback that processes a {@link TimerInternals.TimerData TimerData}.
+ * A callback that processes an Iterable of {@link TimerInternals.TimerData TimerData}.
*/
public interface TimerCallback {
- /** Processes the {@link TimerInternals.TimerData TimerData}. */
- void onTimer(TimerInternals.TimerData timer) throws Exception;
+ /** Processes an Iterable of {@link TimerInternals.TimerData TimerData}. */
+ void onTimers(Iterable<TimerInternals.TimerData> timers) throws Exception;
TimerCallback NO_OP = new TimerCallback() {
@Override
- public void onTimer(TimerInternals.TimerData timer) throws Exception {
+ public void onTimers(Iterable<TimerInternals.TimerData> timers) throws Exception {
// Nothing
}
};
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4282c67c/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
index 951803a..a3a7749 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
@@ -17,6 +17,11 @@
*/
package org.apache.beam.sdk.util.state;
+import static org.mockito.Matchers.argThat;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.joda.time.Instant;
@@ -24,6 +29,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
@@ -44,6 +50,37 @@ public class InMemoryTimerInternalsTest {
MockitoAnnotations.initMocks(this);
}
+ private static class TimersAre extends ArgumentMatcher<Iterable<TimerData>> {
+ final List<TimerData> expectedTimers;
+ TimersAre(List<TimerData> timers) {
+ expectedTimers = timers;
+ }
+
+ @Override
+ public boolean matches(Object actual) {
+ if (actual == null || !(actual instanceof Iterable)) {
+ return false;
+ }
+ @SuppressWarnings("unchecked")
+ Iterable<TimerData> timers = (Iterable<TimerData>) actual;
+
+ List<TimerData> actualTimers = new ArrayList();
+ for (TimerData timer : timers) {
+ actualTimers.add(timer);
+ }
+ return expectedTimers.equals(actualTimers);
+ }
+
+ @Override
+ public String toString() {
+ return "ordered timers " + expectedTimers.toString();
+ }
+ }
+
+ private static TimersAre timersAre(TimerData... timers) {
+ return new TimersAre(Arrays.asList(timers));
+ }
+
@Test
public void testFiringTimers() throws Exception {
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
@@ -54,7 +91,7 @@ public class InMemoryTimerInternalsTest {
underTest.setTimer(processingTime2);
underTest.advanceProcessingTime(timerCallback, new Instant(20));
- Mockito.verify(timerCallback).onTimer(processingTime1);
+ Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime1)));
Mockito.verifyNoMoreInteractions(timerCallback);
// Advancing just a little shouldn't refire
@@ -63,13 +100,13 @@ public class InMemoryTimerInternalsTest {
// Adding the timer and advancing a little should refire
underTest.setTimer(processingTime1);
- Mockito.verify(timerCallback).onTimer(processingTime1);
+ Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime1)));
underTest.advanceProcessingTime(timerCallback, new Instant(21));
Mockito.verifyNoMoreInteractions(timerCallback);
// And advancing the rest of the way should still have the other timer
underTest.advanceProcessingTime(timerCallback, new Instant(30));
- Mockito.verify(timerCallback).onTimer(processingTime2);
+ Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime2)));
Mockito.verifyNoMoreInteractions(timerCallback);
}
@@ -87,13 +124,11 @@ public class InMemoryTimerInternalsTest {
underTest.setTimer(watermarkTime2);
underTest.advanceInputWatermark(timerCallback, new Instant(30));
- Mockito.verify(timerCallback).onTimer(watermarkTime1);
- Mockito.verify(timerCallback).onTimer(watermarkTime2);
+ Mockito.verify(timerCallback).onTimers(argThat(timersAre(watermarkTime1, watermarkTime2)));
Mockito.verifyNoMoreInteractions(timerCallback);
underTest.advanceProcessingTime(timerCallback, new Instant(30));
- Mockito.verify(timerCallback).onTimer(processingTime1);
- Mockito.verify(timerCallback).onTimer(processingTime2);
+ Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime1, processingTime2)));
Mockito.verifyNoMoreInteractions(timerCallback);
}
@@ -107,10 +142,9 @@ public class InMemoryTimerInternalsTest {
underTest.setTimer(processingTime);
underTest.setTimer(processingTime);
underTest.advanceProcessingTime(timerCallback, new Instant(20));
+ Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime)));
underTest.advanceInputWatermark(timerCallback, new Instant(20));
-
- Mockito.verify(timerCallback).onTimer(processingTime);
- Mockito.verify(timerCallback).onTimer(watermarkTime);
+ Mockito.verify(timerCallback).onTimers(argThat(timersAre(watermarkTime)));
Mockito.verifyNoMoreInteractions(timerCallback);
}
}
[2/2] incubator-beam git commit: This closes #1366
Posted by ke...@apache.org.
This closes #1366
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4a7da91f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4a7da91f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4a7da91f
Branch: refs/heads/master
Commit: 4a7da91f063c0de07fca250d1016b85832225732
Parents: b75a764 4282c67
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Nov 30 13:32:27 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Nov 30 13:32:27 2016 -0800
----------------------------------------------------------------------
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 11 +-
.../beam/runners/core/PaneInfoTracker.java | 4 +
.../runners/core/ReduceFnContextFactory.java | 9 +-
.../beam/runners/core/ReduceFnRunner.java | 488 ++++++++++++-------
.../apache/beam/runners/core/WatermarkHold.java | 5 +
.../triggers/TriggerStateMachineRunner.java | 14 +-
.../beam/runners/core/ReduceFnTester.java | 4 +-
.../GroupAlsoByWindowEvaluatorFactory.java | 5 +-
.../apache/beam/sdk/transforms/DoFnTester.java | 6 +-
.../sdk/util/state/InMemoryTimerInternals.java | 22 +-
.../beam/sdk/util/state/TimerCallback.java | 9 +-
.../util/state/InMemoryTimerInternalsTest.java | 54 +-
12 files changed, 407 insertions(+), 224 deletions(-)
----------------------------------------------------------------------