You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/06 21:55:51 UTC
[1/2] incubator-beam git commit: Improve ReduceFnRunner prefetching
Repository: incubator-beam
Updated Branches:
refs/heads/master 9d380de96 -> c72708cd2
Improve ReduceFnRunner prefetching
- add prefetch* methods for prefetching state matching existing methods
- prefetch triggers in processElements
- replace onTimer with batched onTimers method to allow prefetching across
timers
Additionally remove deprecated TimerCallback usage
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2b044f3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2b044f3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2b044f3f
Branch: refs/heads/master
Commit: 2b044f3f315655b863dbc7fd298f33c196fb8ef7
Parents: 9d380de
Author: Sam Whittle <sa...@google.com>
Authored: Thu Nov 10 12:59:49 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Dec 6 13:54:50 2016 -0800
----------------------------------------------------------------------
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 11 +-
.../GroupAlsoByWindowsViaOutputBufferDoFn.java | 48 +-
.../beam/runners/core/PaneInfoTracker.java | 4 +
.../runners/core/ReduceFnContextFactory.java | 9 +-
.../beam/runners/core/ReduceFnRunner.java | 493 ++++++++++++-------
.../apache/beam/runners/core/WatermarkHold.java | 5 +
.../triggers/TriggerStateMachineRunner.java | 14 +-
.../beam/runners/core/ReduceFnTester.java | 77 ++-
.../triggers/TriggerStateMachineTester.java | 17 +-
.../GroupAlsoByWindowEvaluatorFactory.java | 5 +-
10 files changed, 440 insertions(+), 243 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b044f3f/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 2082269..14171b3 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -26,7 +26,6 @@ import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateInternalsFactory;
@@ -72,9 +71,9 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
@Override
public void processElement(ProcessContext c) throws Exception {
- KeyedWorkItem<K, InputT> element = c.element();
+ KeyedWorkItem<K, InputT> keyedWorkItem = c.element();
- K key = c.element().key();
+ K key = keyedWorkItem.key();
TimerInternals timerInternals = c.windowingInternals().timerInternals();
StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
@@ -92,10 +91,8 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
reduceFn,
c.getPipelineOptions());
- reduceFnRunner.processElements(element.elementsIterable());
- for (TimerData timer : element.timersIterable()) {
- reduceFnRunner.onTimer(timer);
- }
+ reduceFnRunner.processElements(keyedWorkItem.elementsIterable());
+ reduceFnRunner.onTimers(keyedWorkItem.timersIterable());
reduceFnRunner.persist();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b044f3f/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
index b4b366c..9189191 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -18,17 +18,18 @@
package org.apache.beam.runners.core;
import com.google.common.collect.Iterables;
+import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateInternalsFactory;
-import org.apache.beam.sdk.util.state.TimerCallback;
import org.joda.time.Instant;
/**
@@ -59,9 +60,8 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends
// timer manager from the context because it doesn't exist. So we create one and emulate the
// watermark, knowing that we have all data and it is in timestamp order.
InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
- timerInternals.advanceProcessingTime(TimerCallback.NO_OP, Instant.now());
- timerInternals.advanceSynchronizedProcessingTime(
- TimerCallback.NO_OP, BoundedWindow.TIMESTAMP_MAX_VALUE);
+ timerInternals.advanceProcessingTime(Instant.now());
+ timerInternals.advanceSynchronizedProcessingTime(Instant.now());
StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
@@ -85,22 +85,50 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends
reduceFnRunner.processElements(chunk);
// Then, since elements are sorted by their timestamp, advance the input watermark
- // to the first element, and fire any timers that may have been scheduled.
- timerInternals.advanceInputWatermark(reduceFnRunner, chunk.iterator().next().getTimestamp());
+ // to the first element.
+ timerInternals.advanceInputWatermark(chunk.iterator().next().getTimestamp());
+ // Advance the processing times.
+ timerInternals.advanceProcessingTime(Instant.now());
+ timerInternals.advanceSynchronizedProcessingTime(Instant.now());
- // Fire any processing timers that need to fire
- timerInternals.advanceProcessingTime(reduceFnRunner, Instant.now());
+ // Fire all the eligible timers.
+ fireEligibleTimers(timerInternals, reduceFnRunner);
// Leave the output watermark undefined. Since there's no late data in batch mode
// there's really no need to track it as we do for streaming.
}
// Finish any pending windows by advancing the input watermark to infinity.
- timerInternals.advanceInputWatermark(reduceFnRunner, BoundedWindow.TIMESTAMP_MAX_VALUE);
+ timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
// Finally, advance the processing time to infinity to fire any timers.
- timerInternals.advanceProcessingTime(reduceFnRunner, BoundedWindow.TIMESTAMP_MAX_VALUE);
+ timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+ fireEligibleTimers(timerInternals, reduceFnRunner);
reduceFnRunner.persist();
}
+
+ private void fireEligibleTimers(InMemoryTimerInternals timerInternals,
+ ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner) throws Exception {
+ List<TimerInternals.TimerData> timers = new ArrayList<>();
+ while (true) {
+ TimerInternals.TimerData timer;
+ while ((timer = timerInternals.removeNextEventTimer()) != null) {
+ timers.add(timer);
+ }
+ while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
+ timers.add(timer);
+ }
+ while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
+ timers.add(timer);
+ }
+ if (timers.isEmpty()) {
+ break;
+ }
+ reduceFnRunner.onTimers(timers);
+ timers.clear();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b044f3f/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
index 8140243..69a4cfd 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
@@ -54,6 +54,10 @@ public class PaneInfoTracker {
state.access(PANE_INFO_TAG).clear();
}
+ public void prefetchPaneInfo(ReduceFn<?, ?, ?, ?>.Context context) {
+ context.state().access(PaneInfoTracker.PANE_INFO_TAG).readLater();
+ }
+
/**
* Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane
* info includes the timing for the pane, who's calculation is quite subtle.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b044f3f/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
index 539126a..c5bda9b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -37,7 +37,6 @@ import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.Timers;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.State;
import org.apache.beam.sdk.util.state.StateAccessor;
import org.apache.beam.sdk.util.state.StateContext;
@@ -117,7 +116,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
}
public ReduceFn<K, InputT, OutputT, W>.OnTriggerContext forTrigger(W window,
- ReadableState<PaneInfo> pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
+ PaneInfo pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
return new OnTriggerContextImpl(stateAccessor(window, style), pane, callbacks);
}
@@ -389,11 +388,11 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
private class OnTriggerContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnTriggerContext {
private final StateAccessorImpl<K, W> state;
- private final ReadableState<PaneInfo> pane;
+ private final PaneInfo pane;
private final OnTriggerCallbacks<OutputT> callbacks;
private final TimersImpl timers;
- private OnTriggerContextImpl(StateAccessorImpl<K, W> state, ReadableState<PaneInfo> pane,
+ private OnTriggerContextImpl(StateAccessorImpl<K, W> state, PaneInfo pane,
OnTriggerCallbacks<OutputT> callbacks) {
reduceFn.super();
this.state = state;
@@ -424,7 +423,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
@Override
public PaneInfo paneInfo() {
- return pane.read();
+ return pane;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b044f3f/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index a686f46..6f7bbcf 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -21,12 +21,15 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -58,10 +61,8 @@ import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
-import org.apache.beam.sdk.util.state.TimerCallback;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
@@ -91,7 +92,7 @@ import org.joda.time.Instant;
* @param <OutputT> The output type that will be produced for each key.
* @param <W> The type of windows this operates on.
*/
-public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> implements TimerCallback {
+public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
/**
* The {@link ReduceFnRunner} depends on most aspects of the {@link WindowingStrategy}.
@@ -268,6 +269,32 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
return activeWindows.getActiveAndNewWindows().isEmpty();
}
+ private Set<W> openWindows(Collection<W> windows) {
+ Set<W> result = new HashSet<>();
+ for (W window : windows) {
+ ReduceFn<K, InputT, OutputT, W>.Context directContext = contextFactory.base(
+ window, StateStyle.DIRECT);
+ if (!triggerRunner.isClosed(directContext.state())) {
+ result.add(window);
+ }
+ }
+ return result;
+ }
+
+ private Collection<W> windowsThatShouldFire(Set<W> windows) throws Exception {
+ Collection<W> result = new ArrayList<>();
+ // Filter out timers that didn't trigger.
+ for (W window : windows) {
+ ReduceFn<K, InputT, OutputT, W>.Context directContext =
+ contextFactory.base(window, StateStyle.DIRECT);
+ if (triggerRunner.shouldFire(
+ directContext.window(), directContext.timers(), directContext.state())) {
+ result.add(window);
+ }
+ }
+ return result;
+ }
+
/**
* Incorporate {@code values} into the underlying reduce function, and manage holds, timers,
* triggers, and window merging.
@@ -293,25 +320,54 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
* </ol>
*/
public void processElements(Iterable<WindowedValue<InputT>> values) throws Exception {
+ if (!values.iterator().hasNext()) {
+ return;
+ }
+
+ // Determine all the windows for elements.
+ Set<W> windows = collectWindows(values);
// If an incoming element introduces a new window, attempt to merge it into an existing
// window eagerly.
- Map<W, W> windowToMergeResult = collectAndMergeWindows(values);
+ Map<W, W> windowToMergeResult = mergeWindows(windows);
+ if (!windowToMergeResult.isEmpty()) {
+ // Update windows by removing all windows that were merged away and adding
+ // the windows they were merged to. We add after completing all the
+ // removals to avoid removing a window that was also added.
+ List<W> addedWindows = new ArrayList<>(windowToMergeResult.size());
+ for (Map.Entry<W, W> entry : windowToMergeResult.entrySet()) {
+ windows.remove(entry.getKey());
+ addedWindows.add(entry.getValue());
+ }
+ windows.addAll(addedWindows);
+ }
- Set<W> windowsToConsider = new HashSet<>();
+ prefetchWindowsForValues(windows);
- // Process each element, using the updated activeWindows determined by collectAndMergeWindows.
+ // All windows that are open before element processing may need to fire.
+ Set<W> windowsToConsider = openWindows(windows);
+
+ // Process each element, using the updated activeWindows determined by mergeWindows.
for (WindowedValue<InputT> value : values) {
- windowsToConsider.addAll(processElement(windowToMergeResult, value));
+ processElement(windowToMergeResult, value);
}
- // Trigger output from any window for which the trigger is ready
+ // Now that we've processed the elements, see if any of the windows need to fire.
+ // Prefetch state necessary to determine if the triggers should fire.
for (W mergedWindow : windowsToConsider) {
- ReduceFn<K, InputT, OutputT, W>.Context directContext =
- contextFactory.base(mergedWindow, StateStyle.DIRECT);
- ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
- contextFactory.base(mergedWindow, StateStyle.RENAMED);
- triggerRunner.prefetchShouldFire(mergedWindow, directContext.state());
- emitIfAppropriate(directContext, renamedContext);
+ triggerRunner.prefetchShouldFire(
+ mergedWindow, contextFactory.base(mergedWindow, StateStyle.DIRECT).state());
+ }
+ // Filter to windows that are firing.
+ Collection<W> windowsToFire = windowsThatShouldFire(windowsToConsider);
+ // Prefetch windows that are firing.
+ for (W window : windowsToFire) {
+ prefetchEmit(contextFactory.base(window, StateStyle.DIRECT),
+ contextFactory.base(window, StateStyle.RENAMED));
+ }
+ // Trigger output from firing windows.
+ for (W window : windowsToFire) {
+ emit(contextFactory.base(window, StateStyle.DIRECT),
+ contextFactory.base(window, StateStyle.RENAMED));
}
// We're all done with merging and emitting elements so can compress the activeWindow state.
@@ -325,52 +381,61 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
}
/**
- * Extract the windows associated with the values, and invoke merge. Return a map
- * from windows to the merge result window. If a window is not in the domain of
- * the result map then it did not get merged into a different window.
+ * Extract the windows associated with the values.
*/
- private Map<W, W> collectAndMergeWindows(Iterable<WindowedValue<InputT>> values)
- throws Exception {
- // No-op if no merging can take place
+ private Set<W> collectWindows(Iterable<WindowedValue<InputT>> values) throws Exception {
+ Set<W> windows = new HashSet<>();
+ for (WindowedValue<?> value : values) {
+ for (BoundedWindow untypedWindow : value.getWindows()) {
+ @SuppressWarnings("unchecked")
+ W window = (W) untypedWindow;
+ windows.add(window);
+ }
+ }
+ return windows;
+ }
+
+ /**
+ * Invoke merge for the given windows and return a map from windows to the
+ * merge result window. Windows that were not merged are not present in the
+ * map.
+ */
+ private Map<W, W> mergeWindows(Set<W> windows) throws Exception {
if (windowingStrategy.getWindowFn().isNonMerging()) {
- return ImmutableMap.of();
+ // Return an empty map, indicating that every window is not merged.
+ return Collections.emptyMap();
}
+ Map<W, W> windowToMergeResult = new HashMap<>();
// Collect the windows from all elements (except those which are too late) and
// make sure they are already in the active window set or are added as NEW windows.
- for (WindowedValue<?> value : values) {
- for (BoundedWindow untypedWindow : value.getWindows()) {
- @SuppressWarnings("unchecked")
- W window = (W) untypedWindow;
-
- // For backwards compat with pre 1.4 only.
- // We may still have ACTIVE windows with multiple state addresses, representing
- // a window who's state has not yet been eagerly merged.
- // We'll go ahead and merge that state now so that we don't have to worry about
- // this legacy case anywhere else.
- if (activeWindows.isActive(window)) {
- Set<W> stateAddressWindows = activeWindows.readStateAddresses(window);
- if (stateAddressWindows.size() > 1) {
- // This is a legacy window who's state has not been eagerly merged.
- // Do that now.
- ReduceFn<K, InputT, OutputT, W>.OnMergeContext premergeContext =
- contextFactory.forPremerge(window);
- reduceFn.onMerge(premergeContext);
- watermarkHold.onMerge(premergeContext);
- activeWindows.merged(window);
- }
+ for (W window : windows) {
+ // For backwards compat with pre 1.4 only.
+ // We may still have ACTIVE windows with multiple state addresses, representing
+ // a window who's state has not yet been eagerly merged.
+ // We'll go ahead and merge that state now so that we don't have to worry about
+ // this legacy case anywhere else.
+ if (activeWindows.isActive(window)) {
+ Set<W> stateAddressWindows = activeWindows.readStateAddresses(window);
+ if (stateAddressWindows.size() > 1) {
+ // This is a legacy window who's state has not been eagerly merged.
+ // Do that now.
+ ReduceFn<K, InputT, OutputT, W>.OnMergeContext premergeContext =
+ contextFactory.forPremerge(window);
+ reduceFn.onMerge(premergeContext);
+ watermarkHold.onMerge(premergeContext);
+ activeWindows.merged(window);
}
-
- // Add this window as NEW if it is not currently ACTIVE.
- // If we had already seen this window and closed its trigger, then the
- // window will not be currently ACTIVE. It will then be added as NEW here,
- // and fall into the merging logic as usual.
- activeWindows.ensureWindowExists(window);
}
+
+ // Add this window as NEW if it is not currently ACTIVE.
+ // If we had already seen this window and closed its trigger, then the
+ // window will not be currently ACTIVE. It will then be added as NEW here,
+ // and fall into the merging logic as usual.
+ activeWindows.ensureWindowExists(window);
}
// Merge all of the active windows and retain a mapping from source windows to result windows.
- Map<W, W> windowToMergeResult = new HashMap<>();
activeWindows.merge(new OnMergeCallback(windowToMergeResult));
return windowToMergeResult;
}
@@ -472,38 +537,50 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
}
/**
- * Process an element.
- *
- * @param value the value being processed
- * @return the set of windows in which the element was actually processed
+ * Redirect element windows to the ACTIVE windows they have been merged into.
+ * The compressed representation (value, {window1, window2, ...}) actually represents
+ * distinct elements (value, window1), (value, window2), ...
+ * so if window1 and window2 merge, the resulting window will contain both copies
+ * of the value.
*/
- private Collection<W> processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT> value)
- throws Exception {
- // Redirect element windows to the ACTIVE windows they have been merged into.
- // The compressed representation (value, {window1, window2, ...}) actually represents
- // distinct elements (value, window1), (value, window2), ...
- // so if window1 and window2 merge, the resulting window will contain both copies
- // of the value.
- Collection<W> windows = new ArrayList<>();
- for (BoundedWindow untypedWindow : value.getWindows()) {
- @SuppressWarnings("unchecked")
- W window = (W) untypedWindow;
- W mergeResult = windowToMergeResult.get(window);
- if (mergeResult == null) {
- mergeResult = window;
- }
- windows.add(mergeResult);
- }
+ private ImmutableSet<W> toMergedWindows(final Map<W, W> windowToMergeResult,
+ final Collection<? extends BoundedWindow> windows) {
+ return ImmutableSet.copyOf(
+ FluentIterable.from(windows).transform(
+ new Function<BoundedWindow, W>() {
+ @Override
+ public W apply(BoundedWindow untypedWindow) {
+ @SuppressWarnings("unchecked")
+ W window = (W) untypedWindow;
+ W mergedWindow = windowToMergeResult.get(window);
+ // If the element is not present in the map, the window is unmerged.
+ return (mergedWindow == null) ? window : mergedWindow;
+ }
+ }
+ ));
+ }
+ private void prefetchWindowsForValues(Collection<W> windows) {
// Prefetch in each of the windows if we're going to need to process triggers
for (W window : windows) {
- ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue(
- window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
+ ReduceFn<K, InputT, OutputT, W>.Context directContext = contextFactory.base(
+ window, StateStyle.DIRECT);
triggerRunner.prefetchForValue(window, directContext.state());
}
+ }
+
+ /**
+ * Process an element.
+ *
+ * @param windowToMergeResult map of windows to merged windows. If a window is
+ * not present it is unmerged.
+ * @param value the value being processed
+ */
+ private void processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT> value)
+ throws Exception {
+ ImmutableSet<W> windows = toMergedWindows(windowToMergeResult, value.getWindows());
// Process the element for each (mergeResultWindow, not closed) window it belongs to.
- List<W> triggerableWindows = new ArrayList<>(windows.size());
for (W window : windows) {
ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue(
window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
@@ -518,7 +595,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
continue;
}
- triggerableWindows.add(window);
activeWindows.ensureWindowIsActive(window);
ReduceFn<K, InputT, OutputT, W>.ProcessValueContext renamedContext = contextFactory.forValue(
window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);
@@ -562,102 +638,152 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
// cannot take a trigger state from firing to non-firing.
// (We don't actually assert this since it is too slow.)
}
-
- return triggerableWindows;
}
/**
- * Called when an end-of-window, garbage collection, or trigger-specific timer fires.
+ * Enriches TimerData with state necessary for processing a timer as well as
+ * common queries about a timer.
*/
- public void onTimer(TimerData timer) throws Exception {
- // Which window is the timer for?
- checkArgument(timer.getNamespace() instanceof WindowNamespace,
- "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace());
- @SuppressWarnings("unchecked")
- WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
- W window = windowNamespace.getWindow();
- ReduceFn<K, InputT, OutputT, W>.Context directContext =
- contextFactory.base(window, StateStyle.DIRECT);
- ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
- contextFactory.base(window, StateStyle.RENAMED);
+ private class EnrichedTimerData {
+ public final Instant timestamp;
+ public final ReduceFn<K, InputT, OutputT, W>.Context directContext;
+ public final ReduceFn<K, InputT, OutputT, W>.Context renamedContext;
+ // If this is an end-of-window timer then we may need to set a garbage collection timer
+ // if allowed lateness is non-zero.
+ public final boolean isEndOfWindow;
+ // If this is a garbage collection timer then we should trigger and
+ // garbage collect the window. We'll consider any timer at or after the
+ // end-of-window time to be a signal to garbage collect.
+ public final boolean isGarbageCollection;
+
+ EnrichedTimerData(
+ TimerData timer,
+ ReduceFn<K, InputT, OutputT, W>.Context directContext,
+ ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
+ this.timestamp = timer.getTimestamp();
+ this.directContext = directContext;
+ this.renamedContext = renamedContext;
+ W window = directContext.window();
+ this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
+ && timer.getTimestamp().equals(window.maxTimestamp());
+ Instant cleanupTime = garbageCollectionTime(window);
+ this.isGarbageCollection = !timer.getTimestamp().isBefore(cleanupTime);
+ }
// Has this window had its trigger finish?
// - The trigger may implement isClosed as constant false.
// - If the window function does not support windowing then all windows will be considered
// active.
// So we must take conjunction of activeWindows and triggerRunner state.
- boolean windowIsActiveAndOpen =
- activeWindows.isActive(window) && !triggerRunner.isClosed(directContext.state());
+ public boolean windowIsActiveAndOpen() {
+ return activeWindows.isActive(directContext.window())
+ && !triggerRunner.isClosed(directContext.state());
+ }
+ }
- if (!windowIsActiveAndOpen) {
- WindowTracing.debug(
- "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window);
+ public void onTimers(Iterable<TimerData> timers) throws Exception {
+ if (!timers.iterator().hasNext()) {
+ return;
}
- // If this is an end-of-window timer then we may need to set a garbage collection timer
- // if allowed lateness is non-zero.
- boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
- && timer.getTimestamp().equals(window.maxTimestamp());
-
- // If this is a garbage collection timer then we should trigger and garbage collect the window.
- // We'll consider any timer at or after the end-of-window time to be a signal to garbage
- // collect.
- Instant cleanupTime = garbageCollectionTime(window);
- boolean isGarbageCollection = TimeDomain.EVENT_TIME == timer.getDomain()
- && !timer.getTimestamp().isBefore(cleanupTime);
-
- if (isGarbageCollection) {
- WindowTracing.debug(
- "ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with "
- + "inputWatermark:{}; outputWatermark:{}",
- key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(),
- timerInternals.currentOutputWatermarkTime());
-
- if (windowIsActiveAndOpen) {
- // We need to call onTrigger to emit the final pane if required.
- // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
- // and the watermark has passed the end of the window.
- @Nullable Instant newHold =
- onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow);
- checkState(newHold == null,
- "Hold placed at %s despite isFinished being true.", newHold);
+ // Create a reusable context for each timer and begin prefetching necessary
+ // state.
+ List<EnrichedTimerData> enrichedTimers = new LinkedList();
+ for (TimerData timer : timers) {
+ checkArgument(timer.getNamespace() instanceof WindowNamespace,
+ "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace());
+ @SuppressWarnings("unchecked")
+ WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
+ W window = windowNamespace.getWindow();
+ ReduceFn<K, InputT, OutputT, W>.Context directContext =
+ contextFactory.base(window, StateStyle.DIRECT);
+ ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
+ contextFactory.base(window, StateStyle.RENAMED);
+ EnrichedTimerData enrichedTimer = new EnrichedTimerData(timer, directContext, renamedContext);
+ enrichedTimers.add(enrichedTimer);
+
+ // Perform prefetching of state to determine if the trigger should fire.
+ if (enrichedTimer.isGarbageCollection) {
+ triggerRunner.prefetchIsClosed(directContext.state());
+ } else {
+ triggerRunner.prefetchShouldFire(directContext.window(), directContext.state());
}
+ }
- // Cleanup flavor B: Clear all the remaining state for this window since we'll never
- // see elements for it again.
- clearAllState(directContext, renamedContext, windowIsActiveAndOpen);
- } else {
- WindowTracing.debug(
- "ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with "
- + "inputWatermark:{}; outputWatermark:{}",
- key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(),
- timerInternals.currentOutputWatermarkTime());
- if (windowIsActiveAndOpen) {
- emitIfAppropriate(directContext, renamedContext);
+ // For those windows that are active and open, prefetch the triggering or emitting state.
+ for (EnrichedTimerData timer : enrichedTimers) {
+ if (timer.windowIsActiveAndOpen()) {
+ ReduceFn<K, InputT, OutputT, W>.Context directContext = timer.directContext;
+ if (timer.isGarbageCollection) {
+ prefetchOnTrigger(directContext, timer.renamedContext);
+ } else if (triggerRunner.shouldFire(
+ directContext.window(), directContext.timers(), directContext.state())) {
+ prefetchEmit(directContext, timer.renamedContext);
+ }
}
+ }
- if (isEndOfWindow) {
- // If the window strategy trigger includes a watermark trigger then at this point
- // there should be no data holds, either because we'd already cleared them on an
- // earlier onTrigger, or because we just cleared them on the above emitIfAppropriate.
- // We could assert this but it is very expensive.
-
- // Since we are processing an on-time firing we should schedule the garbage collection
- // timer. (If getAllowedLateness is zero then the timer event will be considered a
- // cleanup event and handled by the above).
- // Note we must do this even if the trigger is finished so that we are sure to cleanup
- // any final trigger finished bits.
- checkState(
- windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
- "Unexpected zero getAllowedLateness");
- WindowTracing.debug(
- "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with "
- + "inputWatermark:{}; outputWatermark:{}",
- key, directContext.window(), cleanupTime, timerInternals.currentInputWatermarkTime(),
+ // Perform processing now that everything is prefetched.
+ for (EnrichedTimerData timer : enrichedTimers) {
+ ReduceFn<K, InputT, OutputT, W>.Context directContext = timer.directContext;
+ ReduceFn<K, InputT, OutputT, W>.Context renamedContext = timer.renamedContext;
+
+ if (timer.isGarbageCollection) {
+ WindowTracing.debug("ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with "
+ + "inputWatermark:{}; outputWatermark:{}",
+ key, directContext.window(), timer.timestamp,
+ timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
- checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
- "Cleanup time %s is beyond end-of-time", cleanupTime);
- directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME);
+
+ boolean windowIsActiveAndOpen = timer.windowIsActiveAndOpen();
+ if (windowIsActiveAndOpen) {
+ // We need to call onTrigger to emit the final pane if required.
+ // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
+ // and the watermark has passed the end of the window.
+ @Nullable
+ Instant newHold = onTrigger(
+ directContext, renamedContext, true /* isFinished */, timer.isEndOfWindow);
+ checkState(newHold == null, "Hold placed at %s despite isFinished being true.", newHold);
+ }
+
+ // Cleanup flavor B: Clear all the remaining state for this window since we'll never
+ // see elements for it again.
+ clearAllState(directContext, renamedContext, windowIsActiveAndOpen);
+ } else {
+ WindowTracing.debug("ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with "
+ + "inputWatermark:{}; outputWatermark:{}",
+ key, directContext.window(), timer.timestamp,
+ timerInternals.currentInputWatermarkTime(),
+ timerInternals.currentOutputWatermarkTime());
+ if (timer.windowIsActiveAndOpen()
+ && triggerRunner.shouldFire(
+ directContext.window(), directContext.timers(), directContext.state())) {
+ emit(directContext, renamedContext);
+ }
+
+ if (timer.isEndOfWindow) {
+ // If the window strategy trigger includes a watermark trigger then at this point
+ // there should be no data holds, either because we'd already cleared them on an
+ // earlier onTrigger, or because we just cleared them on the above emit.
+ // We could assert this but it is very expensive.
+
+ // Since we are processing an on-time firing we should schedule the garbage collection
+ // timer. (If getAllowedLateness is zero then the timer event will be considered a
+ // cleanup event and handled by the above).
+ // Note we must do this even if the trigger is finished so that we are sure to cleanup
+ // any final trigger finished bits.
+ checkState(windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
+ "Unexpected zero getAllowedLateness");
+ Instant cleanupTime = garbageCollectionTime(directContext.window());
+ WindowTracing.debug(
+ "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with "
+ + "inputWatermark:{}; outputWatermark:{}",
+ key, directContext.window(), cleanupTime, timerInternals.currentInputWatermarkTime(),
+ timerInternals.currentOutputWatermarkTime());
+ checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+ "Cleanup time %s is beyond end-of-time", cleanupTime);
+ directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME);
+ }
}
}
}
@@ -666,7 +792,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
* Clear all the state associated with {@code context}'s window.
* Should only be invoked if we know all future elements for this window will be considered
* beyond allowed lateness.
- * This is a superset of the clearing done by {@link #emitIfAppropriate} below since:
+ * This is a superset of the clearing done by {@link #emit} below since:
* <ol>
* <li>We can clear the trigger finished bits since we'll never need to ask if the trigger is
* closed again.
@@ -692,10 +818,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
} else {
// If !windowIsActiveAndOpen then !activeWindows.isActive (1) or triggerRunner.isClosed (2).
// For (1), if !activeWindows.isActive then the window must be merging and has been
- // explicitly removed by emitIfAppropriate. But in that case the trigger must have fired
+ // explicitly removed by emit. But in that case the trigger must have fired
// and been closed, so this case reduces to (2).
// For (2), if triggerRunner.isClosed then the trigger was fired and entered the
- // closed state. In that case emitIfAppropriate will have cleared all state in
+ // closed state. In that case emit will have cleared all state in
// reduceFn, triggerRunner (except for finished bits), paneInfoTracker and activeWindows.
// We also know nonEmptyPanes must have been unconditionally cleared by the trigger.
// Since the trigger fired the existing watermark holds must have been cleared, and since
@@ -737,17 +863,23 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
return false;
}
+ private void prefetchEmit(ReduceFn<K, InputT, OutputT, W>.Context directContext,
+ ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
+ triggerRunner.prefetchShouldFire(directContext.window(), directContext.state());
+ triggerRunner.prefetchOnFire(directContext.window(), directContext.state());
+ triggerRunner.prefetchIsClosed(directContext.state());
+ prefetchOnTrigger(directContext, renamedContext);
+ }
+
/**
- * Possibly emit a pane if a trigger is ready to fire or timers require it, and cleanup state.
+ * Emit if a trigger is ready to fire or timers require it, and cleanup state.
*/
- private void emitIfAppropriate(ReduceFn<K, InputT, OutputT, W>.Context directContext,
+ private void emit(
+ ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext)
throws Exception {
- if (!triggerRunner.shouldFire(
- directContext.window(), directContext.timers(), directContext.state())) {
- // Ignore unless trigger is ready to fire
- return;
- }
+ checkState(triggerRunner.shouldFire(
+ directContext.window(), directContext.timers(), directContext.state()));
// Inform the trigger of the transition to see if it is finished
triggerRunner.onFire(directContext.window(), directContext.timers(), directContext.state());
@@ -782,7 +914,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
}
/**
- * Do we need to emit a pane?
+ * Do we need to emit?
*/
private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing) {
if (!isEmpty) {
@@ -800,6 +932,15 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
return false;
}
+ private void prefetchOnTrigger(
+ final ReduceFn<K, InputT, OutputT, W>.Context directContext,
+ ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
+ paneInfoTracker.prefetchPaneInfo(directContext);
+ watermarkHold.prefetchExtract(renamedContext);
+ nonEmptyPanes.isEmpty(renamedContext.state()).readLater();
+ reduceFn.prefetchOnTrigger(directContext.state());
+ }
+
/**
* Run the {@link ReduceFn#onTrigger} method and produce any necessary output.
*
@@ -813,25 +954,17 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
throws Exception {
Instant inputWM = timerInternals.currentInputWatermarkTime();
- // Prefetch necessary states
- ReadableState<WatermarkHold.OldAndNewHolds> outputTimestampFuture =
- watermarkHold.extractAndRelease(renamedContext, isFinished).readLater();
- ReadableState<PaneInfo> paneFuture =
- paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater();
- ReadableState<Boolean> isEmptyFuture =
- nonEmptyPanes.isEmpty(renamedContext.state()).readLater();
-
- reduceFn.prefetchOnTrigger(directContext.state());
- triggerRunner.prefetchOnFire(directContext.window(), directContext.state());
-
// Calculate the pane info.
- final PaneInfo pane = paneFuture.read();
- // Extract the window hold, and as a side effect clear it.
+ final PaneInfo pane = paneInfoTracker.getNextPaneInfo(directContext, isFinished).read();
- WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read();
+ // Extract the window hold, and as a side effect clear it.
+ final WatermarkHold.OldAndNewHolds pair =
+ watermarkHold.extractAndRelease(renamedContext, isFinished).read();
final Instant outputTimestamp = pair.oldHold;
@Nullable Instant newHold = pair.newHold;
+ final boolean isEmpty = nonEmptyPanes.isEmpty(renamedContext.state()).read();
+
if (newHold != null) {
// We can't be finished yet.
checkState(
@@ -863,11 +996,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> impleme
}
// Only emit a pane if it has data or empty panes are observable.
- if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) {
+ if (needToEmit(isEmpty, isFinished, pane.getTiming())) {
// Run reduceFn.onTrigger method.
final List<W> windows = Collections.singletonList(directContext.window());
ReduceFn<K, InputT, OutputT, W>.OnTriggerContext renamedTriggerContext =
- contextFactory.forTrigger(directContext.window(), paneFuture, StateStyle.RENAMED,
+ contextFactory.forTrigger(directContext.window(), pane, StateStyle.RENAMED,
new OnTriggerCallbacks<OutputT>() {
@Override
public void output(OutputT toOutput) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b044f3f/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 3c04571..7f1afcc 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -444,6 +444,11 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable {
}
}
+ public void prefetchExtract(final ReduceFn<?, ?, ?, W>.Context context) {
+ context.state().access(elementHoldTag).readLater();
+ context.state().access(EXTRA_HOLD_TAG).readLater();
+ }
+
/**
* Return (a future for) the earliest hold for {@code context}. Clear all the holds after
* reading, but add/restore an end-of-window or garbage collection hold if required.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b044f3f/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
index 9f03216..2f277eb 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
@@ -99,25 +99,25 @@ public class TriggerStateMachineRunner<W extends BoundedWindow> {
return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
}
- public void prefetchForValue(W window, StateAccessor<?> state) {
+ public void prefetchIsClosed(StateAccessor<?> state) {
if (isFinishedSetNeeded()) {
state.access(FINISHED_BITS_TAG).readLater();
}
+ }
+
+ public void prefetchForValue(W window, StateAccessor<?> state) {
+ prefetchIsClosed(state);
rootTrigger.getSpec().prefetchOnElement(
contextFactory.createStateAccessor(window, rootTrigger));
}
public void prefetchOnFire(W window, StateAccessor<?> state) {
- if (isFinishedSetNeeded()) {
- state.access(FINISHED_BITS_TAG).readLater();
- }
+ prefetchIsClosed(state);
rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger));
}
public void prefetchShouldFire(W window, StateAccessor<?> state) {
- if (isFinishedSetNeeded()) {
- state.access(FINISHED_BITS_TAG).readLater();
- }
+ prefetchIsClosed(state);
rootTrigger.getSpec().prefetchShouldFire(
contextFactory.createStateAccessor(window, rootTrigger));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b044f3f/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 337be23..db0cf91 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -78,7 +78,6 @@ import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.util.state.StateNamespaces;
import org.apache.beam.sdk.util.state.StateTag;
import org.apache.beam.sdk.util.state.TestInMemoryStateInternals;
-import org.apache.beam.sdk.util.state.TimerCallback;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
@@ -100,7 +99,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
private final TestInMemoryStateInternals<String> stateInternals =
new TestInMemoryStateInternals<>(KEY);
- private final TestTimerInternals timerInternals = new TestTimerInternals();
+ private final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
private final WindowFn<Object, W> windowFn;
private final TestOutputWindowedValue testOutputter;
@@ -443,8 +442,29 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
* fire. Then advance the output watermark as far as possible.
*/
public void advanceInputWatermark(Instant newInputWatermark) throws Exception {
+ timerInternals.advanceInputWatermark(newInputWatermark);
ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
- timerInternals.advanceInputWatermark(runner, newInputWatermark);
+ while (true) {
+ TimerData timer;
+ List<TimerInternals.TimerData> timers = new ArrayList<>();
+ while ((timer = timerInternals.removeNextEventTimer()) != null) {
+ timers.add(timer);
+ }
+ if (timers.isEmpty()) {
+ break;
+ }
+ runner.onTimers(timers);
+ }
+ if (autoAdvanceOutputWatermark) {
+ Instant hold = stateInternals.earliestWatermarkHold();
+ if (hold == null) {
+ WindowTracing.trace(
+ "TestInMemoryTimerInternals.advanceInputWatermark: no holds, "
+ + "so output watermark = input watermark");
+ hold = timerInternals.currentInputWatermarkTime();
+ }
+ advanceOutputWatermark(hold);
+ }
runner.persist();
}
@@ -458,8 +478,19 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
/** Advance the processing time to the specified time, firing any timers that should fire. */
public void advanceProcessingTime(Instant newProcessingTime) throws Exception {
+ timerInternals.advanceProcessingTime(newProcessingTime);
ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
- timerInternals.advanceProcessingTime(runner, newProcessingTime);
+ while (true) {
+ TimerData timer;
+ List<TimerInternals.TimerData> timers = new ArrayList<>();
+ while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
+ timers.add(timer);
+ }
+ if (timers.isEmpty()) {
+ break;
+ }
+ runner.onTimers(timers);
+ }
runner.persist();
}
@@ -467,9 +498,21 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
* Advance the synchronized processing time to the specified time,
* firing any timers that should fire.
*/
- public void advanceSynchronizedProcessingTime(Instant newProcessingTime) throws Exception {
+ public void advanceSynchronizedProcessingTime(
+ Instant newSynchronizedProcessingTime) throws Exception {
+ timerInternals.advanceSynchronizedProcessingTime(newSynchronizedProcessingTime);
ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
- timerInternals.advanceSynchronizedProcessingTime(runner, newProcessingTime);
+ while (true) {
+ TimerData timer;
+ List<TimerInternals.TimerData> timers = new ArrayList<>();
+ while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
+ timers.add(timer);
+ }
+ if (timers.isEmpty()) {
+ break;
+ }
+ runner.onTimers(timers);
+ }
runner.persist();
}
@@ -509,8 +552,10 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exception {
ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
- runner.onTimer(
+ ArrayList timers = new ArrayList(1);
+ timers.add(
TimerData.of(StateNamespaces.window(windowFn.windowCoder(), window), timestamp, domain));
+ runner.onTimers(timers);
runner.persist();
}
@@ -601,22 +646,4 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
return sum;
}
}
-
- private class TestTimerInternals extends InMemoryTimerInternals {
- @Override
- public void advanceInputWatermark(TimerCallback timerCallback, Instant newInputWatermark)
- throws Exception {
- super.advanceInputWatermark(timerCallback, newInputWatermark);
- if (autoAdvanceOutputWatermark) {
- Instant hold = stateInternals.earliestWatermarkHold();
- if (hold == null) {
- WindowTracing.trace(
- "TestInMemoryTimerInternals.advanceInputWatermark: no holds, "
- + "so output watermark = input watermark");
- hold = currentInputWatermarkTime();
- }
- advanceOutputWatermark(hold);
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b044f3f/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
index 1ccca17..ed5ce9c 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
@@ -53,7 +53,6 @@ import org.apache.beam.sdk.util.state.StateNamespaces;
import org.apache.beam.sdk.util.state.StateNamespaces.WindowAndTriggerNamespace;
import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
import org.apache.beam.sdk.util.state.TestInMemoryStateInternals;
-import org.apache.beam.sdk.util.state.TimerCallback;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -221,14 +220,22 @@ public class TriggerStateMachineTester<InputT, W extends BoundedWindow> {
* possible.
*/
public void advanceInputWatermark(Instant newInputWatermark) throws Exception {
- // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694
- timerInternals.advanceInputWatermark(TimerCallback.NO_OP, newInputWatermark);
+ timerInternals.advanceInputWatermark(newInputWatermark);
+ while (timerInternals.removeNextEventTimer() != null) {
+ // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694
+ }
}
/** Advance the processing time to the specified time. */
public void advanceProcessingTime(Instant newProcessingTime) throws Exception {
- // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694
- timerInternals.advanceProcessingTime(TimerCallback.NO_OP, newProcessingTime);
+ timerInternals.advanceProcessingTime(newProcessingTime);
+ while (timerInternals.removeNextProcessingTimer() != null) {
+ // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694
+ }
+ timerInternals.advanceSynchronizedProcessingTime(newProcessingTime);
+ while (timerInternals.removeNextSynchronizedProcessingTimer() != null) {
+ // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b044f3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 5c6b2c1..87cbbcd 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -47,7 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -201,9 +200,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
// Drop any elements within expired windows
reduceFnRunner.processElements(
dropExpiredWindows(key, workItem.elementsIterable(), timerInternals));
- for (TimerData timer : workItem.timersIterable()) {
- reduceFnRunner.onTimer(timer);
- }
+ reduceFnRunner.onTimers(workItem.timersIterable());
reduceFnRunner.persist();
}
[2/2] incubator-beam git commit: This closes #1519
Posted by ke...@apache.org.
This closes #1519
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c72708cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c72708cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c72708cd
Branch: refs/heads/master
Commit: c72708cd2c5725a739a4198bcede1560812bb5af
Parents: 9d380de 2b044f3
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 6 13:55:32 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Dec 6 13:55:32 2016 -0800
----------------------------------------------------------------------
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 11 +-
.../GroupAlsoByWindowsViaOutputBufferDoFn.java | 48 +-
.../beam/runners/core/PaneInfoTracker.java | 4 +
.../runners/core/ReduceFnContextFactory.java | 9 +-
.../beam/runners/core/ReduceFnRunner.java | 493 ++++++++++++-------
.../apache/beam/runners/core/WatermarkHold.java | 5 +
.../triggers/TriggerStateMachineRunner.java | 14 +-
.../beam/runners/core/ReduceFnTester.java | 77 ++-
.../triggers/TriggerStateMachineTester.java | 17 +-
.../GroupAlsoByWindowEvaluatorFactory.java | 5 +-
10 files changed, 440 insertions(+), 243 deletions(-)
----------------------------------------------------------------------